View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.wikimedia.gobblin.publisher;
19  
20  import java.io.IOException;
21  import java.net.URI;
22  import java.util.AbstractMap;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeSet;
32  import java.util.stream.Collectors;
33  
34  import org.apache.gobblin.configuration.ConfigurationKeys;
35  import org.apache.gobblin.configuration.State;
36  import org.apache.gobblin.configuration.WorkUnitState;
37  import org.apache.gobblin.publisher.DataPublisher;
38  import org.apache.gobblin.util.ParallelRunner;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  
43  import com.google.common.base.Preconditions;
44  import com.google.common.collect.Sets;
45  import com.google.common.io.Closer;
46  
47  import lombok.extern.slf4j.Slf4j;
48  
49  
50  /**
51   * IMPORTANT NOTES:
52   *  - Gobblin published folders are expected to be in the form {PUBLISHERDIR}/{TABLE_NAME}/{PARTITION}
53   *    This means that the "writer.file.path.type" property is expected to be "tablename"
54   *  - Gobblin writter partition-scheme is expected to follow time-order when sorted alphabetically
55   *
56   * Publisher output flag path: {PUBLISHERDIR}/{TABLE_NAME}/{PARTITION}/{FLAG}
57   * For topics having crossed time-partitions boundary across all their kafka-partitions.
58   *
59   * The partitions to be flagged are processed by topic. The flag is written on every time-partition
60   * folder having been written to by writers, and for which every kafka partition have also written
61   * to a later partition.
62   *
63   */
64  @Slf4j
65  public class TimePartitionedFlagDataPublisher extends DataPublisher {
66  
67      public static final String PUBLISHER_PUBLISHED_FLAGS_KEY = ConfigurationKeys.DATA_PUBLISHER_PREFIX + ".published.flags";
68  
69      public static final String PUBLISHER_TIME_PARTITION_FLAG_KEY = ConfigurationKeys.DATA_PUBLISHER_PREFIX + ".timepartition.flag";
70      public static final String DEFAULT_PUBLISHER_TIME_PARTITION_FLAG = "_IMPORTED";
71  
72      private final String flag;
73  
74      private final Closer closer = Closer.create();
75      private final ParallelRunnerWithTouch parallelRunner;
76  
77      private final Set<Path> publishedFlags = Sets.newHashSet();
78  
79      public TimePartitionedFlagDataPublisher(State state) throws IOException {
80          super(state);
81  
82          int parallelRunnerThreads = state.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
83          this.flag = state.getProp(PUBLISHER_TIME_PARTITION_FLAG_KEY, DEFAULT_PUBLISHER_TIME_PARTITION_FLAG);
84          log.info("Time-partition flag for dataset {} is: {}", state.getProp(ConfigurationKeys.DATASET_URN_KEY), flag);
85  
86          FileSystem publisherFs = getPublisherFileSystem(state);
87          // This publisher writes empty files - no checksum needed
88          publisherFs.setWriteChecksum(false);
89  
90          this.parallelRunner = new ParallelRunnerWithTouch(parallelRunnerThreads, publisherFs);
91          this.closer.register(this.parallelRunner);
92      }
93  
94  
95      public static FileSystem getPublisherFileSystem(State state) throws IOException {
96          Configuration conf = new Configuration();
97  
98          // Add all job configuration properties so they are picked up by Hadoop
99          for (String key : state.getPropertyNames()) {
100             conf.set(key, state.getProp(key));
101         }
102         URI writerUri = URI.create(state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI));
103         URI publisherUri = URI.create(state.getProp(ConfigurationKeys.DATA_PUBLISHER_FILE_SYSTEM_URI, writerUri.toString()));
104         return FileSystem.get(publisherUri, conf);
105     }
106 
107     @Deprecated
108     @Override
109     public void initialize() {
110         // no initialization required
111     }
112 
113     @Override
114     public void close() throws IOException {
115         try {
116             for (Path path : this.publishedFlags) {
117                 this.state.appendToSetProp(PUBLISHER_PUBLISHED_FLAGS_KEY, path.toString());
118             }
119         } finally {
120             this.closer.close();
121         }
122     }
123 
124     /**
125      * This function loops over workUnitStates and extracts for each processed table the time-partitions to flag as:
126      *  - Any time-partition being less than the minimum across the maximum-time-partitions per table-partition.
127      */
128     private Map<String, Set<String>> getPartitionsToFlagByTable(Collection<? extends WorkUnitState> states) {
129         List<String> tablesNotToFlag = new ArrayList<>();
130 
131         // This map contains, for valid tables:
132         //  - The minimum of the maximum time-partitions across table-partitions
133         //  - The list of written time-partitions whose value is before the minimum of maximums
134         Map<String, Map.Entry<String, Set<String>>> tablesTimePartitions = new HashMap<>();
135 
136         // Loop over single-tasks state
137         for (WorkUnitState workUnitState : states) {
138             Preconditions.checkArgument(workUnitState.contains(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY));
139 
140             String tableName = workUnitState.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY);
141             // Don't process tables having a failed task
142             if (workUnitState.getWorkingState() == WorkUnitState.WorkingState.FAILED) {
143                 tablesTimePartitions.remove(tableName);
144                 tablesNotToFlag.add(tableName);
145                 log.debug(" Marking table {} as NOT to be flagged due to failed tasks", tableName);
146             }
147 
148             // Process only committed tasks with actual data on accepted tables
149             if (workUnitState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED &&
150                     workUnitState.getPropAsInt(ConfigurationKeys.WRITER_RECORDS_WRITTEN) > 0 &&
151                     !tablesNotToFlag.contains(tableName)) {
152 
153                 // loop over state properties to find written partitions and sort them in descending order
154                 TreeSet<String> writtenPartitions = new TreeSet<>(Collections.reverseOrder());
155                 for (Map.Entry<Object, Object> property : workUnitState.getProperties().entrySet()) {
156                     if (((String) property.getKey()).startsWith(ConfigurationKeys.WRITER_PARTITION_PATH_KEY)) {
157                         writtenPartitions.add((String) property.getValue());
158                     }
159                 }
160 
161                 // NOTE: From now on writtenPartitions contains partitions to be flagged,
162                 // as the first element has been removed.
163                 String writtenMax = writtenPartitions.pollFirst();
164                 // Only process time-partitions if there are some
165                 if (null != writtenMax) {
166                     // Initialization of table data if not in map
167                     if (!tablesTimePartitions.containsKey(tableName)) {
168                         tablesTimePartitions.put(tableName, new AbstractMap.SimpleEntry<>(writtenMax, new HashSet<>(writtenPartitions)));
169                     } else { // Merge writtenPartitions with existing time-partitions data for table
170                         String tableMax = tablesTimePartitions.get(tableName).getKey();
171                         Set<String> tablePartitionsToFlag = tablesTimePartitions.get(tableName).getValue();
172 
173                         // Add written partitions to flag to the set
174                         tablePartitionsToFlag.addAll(writtenPartitions);
175                         // Define new max (writtenMax can't be null as writtenPartitions
176                         final String newMax = (writtenMax.compareTo(tableMax) < 0) ? writtenMax : tableMax;
177                         // Clear set from partitions higher than newMax
178                         tablePartitionsToFlag.removeIf(p -> p.compareTo(newMax) >= 0);
179 
180                         tablesTimePartitions.put(tableName, new AbstractMap.SimpleEntry<>(newMax, tablePartitionsToFlag));
181                     }
182                 }
183             }
184         }
185         Map<String, Set<String>> result = new HashMap<>();
186         tablesTimePartitions.forEach((k, v) -> result.put(k, v.getValue()));
187 
188         return result;
189     }
190 
191     /**
192      * This function creates tasks to asynchronously write flags as defined in the tablesPartitions
193      * parameter.
194      * @param tablesPartitions A map of tables and their partitions to be flagged
195      */
196     private void writeFlags(Map<String, Set<String>> tablesPartitions) {
197         String publisherFinalBaseDir = state.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
198         // To validate that the folder to flag is coherent with published folders
199         Set<String> publishedDirs = state.getPropAsSet(ConfigurationKeys.PUBLISHER_DIRS, "");
200 
201         for (Map.Entry<String, Set<String>> tableAndPartitions: tablesPartitions.entrySet()) {
202             String table = tableAndPartitions.getKey();
203             for (String partitionToFlag: tableAndPartitions.getValue()) {
204                 String pathToFlag = publisherFinalBaseDir + "/" + table + "/" + partitionToFlag;
205                 if (publishedDirs.contains(pathToFlag)) {
206                     Path flagPath = new Path(pathToFlag, flag);
207                     parallelRunner.touchPath(flagPath);
208                     publishedFlags.add(flagPath);
209                 } else {
210                     log.warn("Path-to-flag {} is not present in the list of published-directories", pathToFlag);
211                 }
212             }
213         }
214     }
215 
216     @Override
217     public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
218 
219         log.debug("Compute time-partitions to flag");
220         Map<String, Set<String>> tablesPartitions = getPartitionsToFlagByTable(states);
221 
222         log.debug("Write flags in time-partition folders");
223         writeFlags(tablesPartitions);
224 
225         List<String> publishedFlagsString = this.publishedFlags.stream().map(Path::toString).collect(Collectors.toList());
226         log.info("{} time-partition flags published: {}", this.publishedFlags.size(), publishedFlagsString);
227     }
228 
229     @Override
230     public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
231         // Nothing to do
232     }
233 
234 }