View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.wikimedia.gobblin.copy;
18  
19  import static org.apache.gobblin.util.retry.RetryerFactory.*;
20  import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.nio.charset.StandardCharsets;
25  import java.util.*;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.io.IOUtils;
29  import org.apache.gobblin.config.ConfigBuilder;
30  import org.apache.gobblin.configuration.ConfigurationKeys;
31  import org.apache.gobblin.configuration.SourceState;
32  import org.apache.gobblin.configuration.State;
33  import org.apache.gobblin.configuration.WorkUnitState;
34  import org.apache.gobblin.dataset.DatasetConstants;
35  import org.apache.gobblin.dataset.DatasetDescriptor;
36  import org.apache.gobblin.dataset.Descriptor;
37  import org.apache.gobblin.dataset.PartitionDescriptor;
38  import org.apache.gobblin.metadata.MetadataMerger;
39  import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
40  import org.apache.gobblin.metrics.event.lineage.LineageInfo;
41  import org.apache.gobblin.publisher.SingleTaskDataPublisher;
42  import org.apache.gobblin.util.ForkOperatorUtils;
43  import org.apache.gobblin.util.HadoopUtils;
44  import org.apache.gobblin.util.ParallelRunner;
45  import org.apache.gobblin.util.WriterUtils;
46  import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
47  import org.apache.gobblin.writer.FsDataWriter;
48  import org.apache.gobblin.writer.FsWriterMetrics;
49  import org.apache.gobblin.writer.PartitionIdentifier;
50  import org.apache.gobblin.writer.PartitionedDataWriter;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.fs.*;
53  import org.apache.hadoop.fs.permission.FsPermission;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  import com.google.common.base.Optional;
58  import com.google.common.collect.*;
59  import com.google.common.io.Closer;
60  import com.typesafe.config.Config;
61  import com.typesafe.config.ConfigFactory;
62  import com.typesafe.config.ConfigRenderOptions;
63  
64  import de.thetaphi.forbiddenapis.SuppressForbidden;
65  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
66  
67  
68  /**
69   * A basic implementation of {@link SingleTaskDataPublisher} that publishes the data from the writer output directory
70   * to the final output directory.
71   *
72   * <p>
73   * The final output directory is specified by {@link ConfigurationKeys#DATA_PUBLISHER_FINAL_DIR}. The output of each
74   * writer is written to this directory. Each individual writer can also specify a path in the config key
75   * {@link ConfigurationKeys#WRITER_FILE_PATH}. Then the final output data for a writer will be
76   * {@link ConfigurationKeys#DATA_PUBLISHER_FINAL_DIR}/{@link ConfigurationKeys#WRITER_FILE_PATH}. If the
77   * {@link ConfigurationKeys#WRITER_FILE_PATH} is not specified, a default one is assigned. The default path is
78   * constructed in the {@link org.apache.gobblin.source.workunit.Extract#getOutputFilePath()} method.
79   * </p>
80   *
81   * <p>
82   * This publisher records all dirs it publishes to in property {@link ConfigurationKeys#PUBLISHER_DIRS}. Each time it
83   * publishes a {@link Path}, if the path is a directory, it records this path. If the path is a file, it records the
84   * parent directory of the path. To change this behavior one may override
85   * {@link #recordPublisherOutputDirs(Path, Path, int)}.
86   * </p>
87   *
88   * <p>
89   * This is an updated copy of {@link org.apache.gobblin.publisher.BaseDataPublisher}
90   * in gobblin-core module. This file should be deleted in favor of the upstream version when possible.
91   * Updates are:
92   * <ul>
93   *  <li>Extract function {@link #addWriterOutputToNewDir} from {@link #publishData} allowing to override the behavior
94   *  in TimePartitionedDataPublisher -- https://github.com/apache/gobblin/pull/3409</li>
95   * </ul>
96   * </p>
97   */
98  @SuppressForbidden
99  @SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
100 public class BaseDataPublisher extends SingleTaskDataPublisher {
101 
102     private static final Logger LOG = LoggerFactory.getLogger(org.apache.gobblin.publisher.BaseDataPublisher.class);
103 
104     protected final int numBranches;
105     protected final List<FileSystem> writerFileSystemByBranches;
106     protected final List<FileSystem> publisherFileSystemByBranches;
107     protected final List<FileSystem> metaDataWriterFileSystemByBranches;
108     protected final List<Optional<String>> publisherFinalDirOwnerGroupsByBranches;
109     protected final List<FsPermission> permissions;
110     protected final Closer closer;
111     protected final Closer parallelRunnerCloser;
112     protected final int parallelRunnerThreads;
113     protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
114     protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
115     protected final Optional<LineageInfo> lineageInfo;
116 
117     /* Each partition in each branch may have separate metadata. The metadata mergers are responsible
118      * for aggregating this information from all workunits so it can be published.
119      */
120     protected final Map<PartitionIdentifier, MetadataMerger<String>> metadataMergers;
121     protected final boolean shouldRetry;
122 
123     static final String DATA_PUBLISHER_RETRY_PREFIX = ConfigurationKeys.DATA_PUBLISHER_PREFIX + ".retry.";
124     static final String PUBLISH_RETRY_ENABLED = DATA_PUBLISHER_RETRY_PREFIX + "enabled";
125 
126     static final Config PUBLISH_RETRY_DEFAULTS;
127     protected final Config retrierConfig;
128 
129     static {
130         Map<String, Object> configMap =
131                 ImmutableMap.<String, Object>builder()
132                         .put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L))   //Overall retry for 2 minutes
133                         .put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to retry 5 seconds
134                         .put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
135                         .put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
136                         .build();
137         PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
138     };
139 
140     public BaseDataPublisher(State state)
141             throws IOException {
142         super(state);
143         this.closer = Closer.create();
144         Configuration conf = new Configuration();
145 
146         // Add all job configuration properties so they are picked up by Hadoop
147         for (String key : this.getState().getPropertyNames()) {
148             conf.set(key, this.getState().getProp(key));
149         }
150 
151         // Extract LineageInfo from state
152         if (state instanceof SourceState) {
153             lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker());
154         } else if (state instanceof WorkUnitState) {
155             lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable());
156         } else {
157             lineageInfo = Optional.absent();
158         }
159 
160         this.numBranches = this.getState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
161         this.shouldRetry = this.getState().getPropAsBoolean(PUBLISH_RETRY_ENABLED, false);
162 
163         this.writerFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
164         this.publisherFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
165         this.metaDataWriterFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
166         this.publisherFinalDirOwnerGroupsByBranches = Lists.newArrayListWithCapacity(this.numBranches);
167         this.permissions = Lists.newArrayListWithCapacity(this.numBranches);
168         this.metadataMergers = new HashMap<>();
169 
170         // Get a FileSystem instance for each branch
171         for (int i = 0; i < this.numBranches; i++) {
172             URI writerUri = URI.create(this.getState().getProp(
173                     ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, this.numBranches, i),
174                     ConfigurationKeys.LOCAL_FS_URI));
175             this.writerFileSystemByBranches.add(FileSystem.get(writerUri, conf));
176 
177             URI publisherUri = URI.create(this.getState().getProp(ForkOperatorUtils
178                             .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FILE_SYSTEM_URI, this.numBranches, i),
179                     writerUri.toString()));
180             this.publisherFileSystemByBranches.add(FileSystem.get(publisherUri, conf));
181             this.metaDataWriterFileSystemByBranches.add(FileSystem.get(publisherUri, conf));
182 
183             // The group(s) will be applied to the final publisher output directory(ies)
184             this.publisherFinalDirOwnerGroupsByBranches.add(Optional.fromNullable(this.getState().getProp(ForkOperatorUtils
185                     .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR_GROUP, this.numBranches, i))));
186 
187             // The permission(s) will be applied to all directories created by the publisher,
188             // which do NOT include directories created by the writer and moved by the publisher.
189             // The permissions of those directories are controlled by writer.file.permissions and writer.dir.permissions.
190             this.permissions.add(new FsPermission(state.getPropAsShortWithRadix(
191                     ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_PERMISSIONS, this.numBranches, i),
192                     FsPermission.getDefault().toShort(), ConfigurationKeys.PERMISSION_PARSING_RADIX)));
193         }
194 
195         if (this.shouldRetry) {
196             this.retrierConfig = ConfigBuilder.create()
197                     .loadProps(this.getState().getProperties(), DATA_PUBLISHER_RETRY_PREFIX)
198                     .build()
199                     .withFallback(PUBLISH_RETRY_DEFAULTS);
200             LOG.info("Retry enabled for publish with config : "+ retrierConfig.root().render(ConfigRenderOptions.concise()));
201 
202         }else {
203             LOG.info("Retry disabled for publish.");
204             this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
205         }
206 
207 
208         this.parallelRunnerThreads =
209                 state.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
210         this.parallelRunnerCloser = Closer.create();
211     }
212 
213     private MetadataMerger<String> buildMetadataMergerForBranch(String metadataFromConfig, int branchId,
214                                                                 Path existingMetadataPath) {
215         // Legacy behavior -- if we shouldn't publish writer state, instantiate a static metadata merger
216         // that just returns the metadata from config (if any)
217         if (!shouldPublishWriterMetadataForBranch(branchId)) {
218             return new StaticStringMetadataMerger(metadataFromConfig);
219         }
220 
221         String keyName = ForkOperatorUtils
222                 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_KEY, this.numBranches,
223                         branchId);
224         String className =
225                 this.getState().getProp(keyName, ConfigurationKeys.DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_DEFAULT);
226 
227         try {
228             Class<?> mdClass = Class.forName(className);
229 
230             // If the merger understands properties, use that constructor; otherwise use the default
231             // parameter-less ctor
232             @SuppressWarnings("unchecked")
233             Object merger = GobblinConstructorUtils
234                     .invokeFirstConstructor(mdClass, Collections.<Object>singletonList(this.getState().getProperties()),
235                             Collections.<Object>emptyList());
236 
237             try {
238                 @SuppressWarnings("unchecked")
239                 MetadataMerger<String> casted = (MetadataMerger<String>) merger;
240 
241                 // Merge existing metadata from the partition if it exists..
242                 String existingMetadata = loadExistingMetadata(existingMetadataPath, branchId);
243                 if (existingMetadata != null) {
244                     casted.update(existingMetadata);
245                 }
246 
247                 // Then metadata from the config...
248                 if (metadataFromConfig != null) {
249                     casted.update(metadataFromConfig);
250                 }
251                 return casted;
252             } catch (ClassCastException e) {
253                 throw new IllegalArgumentException(className + " does not implement the MetadataMerger interface", e);
254             }
255         } catch (ClassNotFoundException e) {
256             throw new IllegalArgumentException("Specified metadata merger class " + className + " not found!", e);
257         } catch (ReflectiveOperationException e) {
258             throw new IllegalArgumentException("Error building merger class " + className, e);
259         }
260     }
261 
262     /**
263      * Read in existing metadata as a UTF8 string.
264      */
265     private String loadExistingMetadata(Path metadataFilename, int branchId) {
266         try {
267             FileSystem fsForBranch = writerFileSystemByBranches.get(branchId);
268             if (!fsForBranch.exists(metadataFilename)) {
269                 return null;
270             }
271             FSDataInputStream existingMetadata = writerFileSystemByBranches.get(branchId).open(metadataFilename);
272             return IOUtils.toString(existingMetadata, StandardCharsets.UTF_8);
273         } catch (IOException e) {
274             LOG.warn("IOException {} while trying to read existing metadata {} - treating as null", e.getMessage(),
275                     metadataFilename.toString());
276             return null;
277         }
278     }
279 
280     @Override
281     public void initialize()
282             throws IOException {
283         // Nothing needs to be done since the constructor already initializes the publisher.
284     }
285 
286     @Override
287     public void close()
288             throws IOException {
289         try {
290             for (Path path : this.publisherOutputDirs) {
291                 this.state.appendToSetProp(ConfigurationKeys.PUBLISHER_DIRS, path.toString());
292             }
293             this.state.setProp(ConfigurationKeys.PUBLISHER_LATEST_FILE_ARRIVAL_TIMESTAMP, System.currentTimeMillis());
294         } finally {
295             this.closer.close();
296         }
297     }
298 
299     private void addLineageInfo(WorkUnitState state, int branchId) {
300         if (!this.lineageInfo.isPresent()) {
301             LOG.info("Will not add lineage info");
302             return;
303         }
304 
305         // Final dataset descriptor
306         DatasetDescriptor datasetDescriptor = createDestinationDescriptor(state, branchId);
307 
308         List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, branchId);
309         List<Descriptor> descriptors = new ArrayList<>();
310         if (partitions.size() == 0) {
311             // Report as dataset level lineage
312             descriptors.add(datasetDescriptor);
313         } else {
314             // Report as partition level lineage
315             for (PartitionDescriptor partition : partitions) {
316                 descriptors.add(partition.copyWithNewDataset(datasetDescriptor));
317             }
318         }
319         this.lineageInfo.get().putDestination(descriptors, branchId, state);
320     }
321 
322     /**
323      * Create destination dataset descriptor
324      */
325     protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
326         Path publisherOutputDir = getPublisherOutputDir(state, branchId);
327         FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
328         DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), fs.getUri(), publisherOutputDir.toString());
329         destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
330         destination.addMetadata(DatasetConstants.BRANCH, String.valueOf(branchId));
331         return destination;
332     }
333 
334     @Override
335     public void publishData(WorkUnitState state)
336             throws IOException {
337         for (int branchId = 0; branchId < this.numBranches; branchId++) {
338             publishSingleTaskData(state, branchId);
339         }
340         this.parallelRunnerCloser.close();
341     }
342 
343     /**
344      * This method publishes output data for a single task based on the given {@link WorkUnitState}.
345      * Output data from other tasks won't be published even if they are in the same folder.
346      */
347     private void publishSingleTaskData(WorkUnitState state, int branchId)
348             throws IOException {
349         publishData(state, branchId, true, new HashSet<Path>());
350         addLineageInfo(state, branchId);
351     }
352 
353     @Override
354     public void publishData(Collection<? extends WorkUnitState> states)
355             throws IOException {
356 
357         // We need a Set to collect unique writer output paths as multiple tasks may belong to the same extract. Tasks that
358         // belong to the same Extract will by default have the same output directory
359         Set<Path> writerOutputPathsMoved = Sets.newHashSet();
360 
361         for (WorkUnitState workUnitState : states) {
362             for (int branchId = 0; branchId < this.numBranches; branchId++) {
363                 publishMultiTaskData(workUnitState, branchId, writerOutputPathsMoved);
364             }
365         }
366 
367         this.parallelRunnerCloser.close();
368     }
369 
370     /**
371      * This method publishes task output data for the given {@link WorkUnitState}, but if there are output data of
372      * other tasks in the same folder, it may also publish those data.
373      */
374     protected void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved)
375             throws IOException {
376         publishData(state, branchId, false, writerOutputPathsMoved);
377         addLineageInfo(state, branchId);
378     }
379 
380     protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
381                                Set<Path> writerOutputPathsMoved)
382             throws IOException {
383         // Get a ParallelRunner instance for moving files in parallel
384         ParallelRunner parallelRunner = this.getParallelRunner(this.writerFileSystemByBranches.get(branchId));
385 
386         // The directory where the workUnitState wrote its output data.
387         Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
388 
389         if (!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) {
390             LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId()));
391             return;
392         }
393 
394         // The directory where the final output directory for this job will be placed.
395         // It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH.
396         Path publisherOutputDir = getPublisherOutputDir(state, branchId);
397 
398         if (publishSingleTaskData) {
399             // Create final output directory
400             WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
401                     this.permissions.get(branchId), retrierConfig);
402             addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
403         } else {
404             if (writerOutputPathsMoved.contains(writerOutputDir)) {
405                 // This writer output path has already been moved for another task of the same extract
406                 // If publishSingleTaskData=true, writerOutputPathMoved is ignored.
407                 return;
408             }
409 
410             if (this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
411                 // The final output directory already exists, check if the job is configured to replace it.
412                 // If publishSingleTaskData=true, final output directory is never replaced.
413                 boolean replaceFinalOutputDir = this.getState().getPropAsBoolean(ForkOperatorUtils
414                         .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, branchId));
415 
416                 // If the final output directory is not configured to be replaced, put new data to the existing directory.
417                 if (!replaceFinalOutputDir) {
418                     addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
419                     writerOutputPathsMoved.add(writerOutputDir);
420                     return;
421                 }
422 
423                 // Delete the final output directory if it is configured to be replaced
424                 LOG.info("Deleting publisher output dir " + publisherOutputDir);
425                 this.publisherFileSystemByBranches.get(branchId).delete(publisherOutputDir, true);
426             }
427             // If we reach this point, publisherOutputDir doesn't exist
428             addWriterOutputToNewDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
429             writerOutputPathsMoved.add(writerOutputDir);
430         }
431     }
432 
433     /**
434      * Get the output directory path this {@link org.apache.gobblin.publisher.BaseDataPublisher} will write to.
435      *
436      * <p>
437      *   This is the default implementation. Subclasses of {@link org.apache.gobblin.publisher.BaseDataPublisher} may override this
438      *   to write to a custom directory or write using a custom directory structure or naming pattern.
439      * </p>
440      *
441      * @param workUnitState a {@link WorkUnitState} object
442      * @param branchId the fork branch ID
443      * @return the output directory path this {@link org.apache.gobblin.publisher.BaseDataPublisher} will write to
444      */
445     protected Path getPublisherOutputDir(WorkUnitState workUnitState, int branchId) {
446         return WriterUtils.getDataPublisherFinalDir(workUnitState, this.numBranches, branchId);
447     }
448 
449     protected void addSingleTaskWriterOutputToExistingDir(Path writerOutputDir, Path publisherOutputDir,
450                                                           WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner)
451             throws IOException {
452         String outputFilePropName = ForkOperatorUtils
453                 .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, branchId);
454 
455         if (!workUnitState.contains(outputFilePropName)) {
456             LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data.");
457             return;
458         }
459 
460         Iterable<String> taskOutputFiles = workUnitState.getPropAsSet(outputFilePropName);
461         for (String taskOutputFile : taskOutputFiles) {
462             Path taskOutputPath = new Path(taskOutputFile);
463             if (!this.writerFileSystemByBranches.get(branchId).exists(taskOutputPath)) {
464                 LOG.warn("Task output file " + taskOutputFile + " doesn't exist.");
465                 continue;
466             }
467             String pathSuffix = taskOutputFile
468                     .substring(taskOutputFile.indexOf(writerOutputDir.toString()) + writerOutputDir.toString().length() + 1);
469             Path publisherOutputPath = new Path(publisherOutputDir, pathSuffix);
470             WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
471                     publisherOutputPath.getParent(), this.permissions.get(branchId), retrierConfig);
472 
473             movePath(parallelRunner, workUnitState, taskOutputPath, publisherOutputPath, branchId);
474         }
475     }
476 
477     protected void addWriterOutputToNewDir(Path writerOutput, Path publisherOutput,
478                                            WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner)
479             throws IOException {
480         // Create the parent directory of the final output directory if it does not exist
481         WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
482                 publisherOutput.getParent(), this.permissions.get(branchId), retrierConfig);
483         movePath(parallelRunner, state, writerOutput, publisherOutput, branchId);
484     }
485 
486     protected void addWriterOutputToExistingDir(Path writerOutputDir, Path publisherOutputDir,
487                                                 WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner)
488             throws IOException {
489         boolean preserveFileName = workUnitState.getPropAsBoolean(ForkOperatorUtils
490                         .getPropertyNameForBranch(ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, branchId),
491                 false);
492         // Go through each file in writerOutputDir and move it into publisherOutputDir
493         for (FileStatus status : this.writerFileSystemByBranches.get(branchId).listStatus(writerOutputDir)) {
494 
495             // Preserve the file name if configured, use specified name otherwise
496             Path finalOutputPath = preserveFileName ? new Path(publisherOutputDir, workUnitState.getProp(ForkOperatorUtils
497                     .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, branchId)))
498                     : new Path(publisherOutputDir, status.getPath().getName());
499 
500             movePath(parallelRunner, workUnitState, status.getPath(), finalOutputPath, branchId);
501         }
502     }
503 
504     protected void movePath(ParallelRunner parallelRunner, State state, Path src, Path dst, int branchId)
505             throws IOException {
506         LOG.info(String.format("Moving %s to %s", src, dst));
507         boolean overwrite = state.getPropAsBoolean(ConfigurationKeys.DATA_PUBLISHER_OVERWRITE_ENABLED, false);
508         this.publisherOutputDirs.addAll(recordPublisherOutputDirs(src, dst, branchId));
509         parallelRunner.movePath(src, this.publisherFileSystemByBranches.get(branchId), dst, overwrite,
510                 this.publisherFinalDirOwnerGroupsByBranches.get(branchId));
511     }
512 
513     protected Collection<Path> recordPublisherOutputDirs(Path src, Path dst, int branchId)
514             throws IOException {
515 
516         // Getting file status from src rather than dst, because at this time dst doesn't yet exist.
517         // If src is a dir, add dst to the set of paths. Otherwise, add dst's parent.
518         if (this.writerFileSystemByBranches.get(branchId).getFileStatus(src).isDirectory()) {
519             return ImmutableList.<Path>of(dst);
520         }
521         return ImmutableList.<Path>of(dst.getParent());
522     }
523 
524     private ParallelRunner getParallelRunner(FileSystem fs) {
525         String uri = fs.getUri().toString();
526         if (!this.parallelRunners.containsKey(uri)) {
527             this.parallelRunners
528                     .put(uri, this.parallelRunnerCloser.register(new ParallelRunner(this.parallelRunnerThreads, fs)));
529         }
530         return this.parallelRunners.get(uri);
531     }
532 
533     /**
534      * Merge all of the metadata output from each work-unit and publish the merged record.
535      * @param states States from all tasks
536      * @throws IOException If there is an error publishing the file
537      */
538     @Override
539     public void publishMetadata(Collection<? extends WorkUnitState> states)
540             throws IOException {
541         Set<String> partitions = new HashSet<>();
542 
543         // There should be one merged metadata file per branch; first merge all of the pieces together
544         mergeMetadataAndCollectPartitionNames(states, partitions);
545         partitions.removeIf(Objects::isNull);
546 
547         // Now, pick an arbitrary WorkUnitState to get config information around metadata such as
548         // the desired output filename. We assume that publisher config settings
549         // are the same across all workunits so it doesn't really matter which workUnit we retrieve this information
550         // from.
551 
552         WorkUnitState anyState = states.iterator().next();
553         for (int branchId = 0; branchId < numBranches; branchId++) {
554             String mdOutputPath = getMetadataOutputPathFromState(anyState, branchId);
555             String userSpecifiedPath = getUserSpecifiedOutputPathFromState(anyState, branchId);
556 
557             if (partitions.isEmpty() || userSpecifiedPath != null) {
558                 publishMetadata(getMergedMetadataForPartitionAndBranch(null, branchId),
559                         branchId,
560                         getMetadataOutputFileForBranch(anyState, branchId));
561             } else {
562                 String metadataFilename = getMetadataFileNameForBranch(anyState, branchId);
563                 if (mdOutputPath == null || metadataFilename == null) {
564                     LOG.info("Metadata filename not set for branch " + String.valueOf(branchId) + ": not publishing metadata.");
565                     continue;
566                 }
567 
568                 for (String partition : partitions) {
569                     publishMetadata(getMergedMetadataForPartitionAndBranch(partition, branchId),
570                             branchId,
571                             new Path(new Path(mdOutputPath, partition), metadataFilename));
572                 }
573             }
574         }
575     }
576 
577     /*
578      * Metadata that we publish can come from several places:
579      *  - It can be passed in job config (DATA_PUBLISHER_METADATA_STR)
580      *  - It can be picked up from previous runs of a job (if the output partition already exists)
581      *  -- The above two are handled when we construct a new MetadataMerger
582      *
583      *  - The source/converters/writers associated with each branch of a job may add their own metadata
584      *    (eg: this dataset is encrypted using AES256). This is returned by getIntermediateMetadataFromState()
585      *    and fed into the MetadataMerger.
586      *  - FsWriterMetrics can be emitted and rolled up into metadata. These metrics are specific to a {partition, branch}
587      *    combo as they mention per-output file metrics. This is also fed into metadata mergers.
588      *
589      *  Each writer should only be a part of one branch, but it may be responsible for multiple partitions.
590      */
591     private void mergeMetadataAndCollectPartitionNames(Collection<? extends WorkUnitState> states,
592                                                        Set<String> partitionPaths) {
593 
594         for (WorkUnitState workUnitState : states) {
595             // First extract the partition paths and metrics from the work unit. This is essentially
596             // equivalent to grouping FsWriterMetrics by {partitionKey, branchId} and extracting
597             // all partitionPaths into a set.
598             Map<PartitionIdentifier, Set<FsWriterMetrics>> metricsByPartition = new HashMap<>();
599             boolean partitionFound = false;
600             for (Map.Entry<Object, Object> property : workUnitState.getProperties().entrySet()) {
601                 if (((String) property.getKey()).startsWith(ConfigurationKeys.WRITER_PARTITION_PATH_KEY)) {
602                     partitionPaths.add((String) property.getValue());
603                     partitionFound = true;
604                 } else if (((String) property.getKey()).startsWith(FsDataWriter.FS_WRITER_METRICS_KEY)) {
605                     try {
606                         FsWriterMetrics parsedMetrics = FsWriterMetrics.fromJson((String) property.getValue());
607                         partitionPaths.add(parsedMetrics.getPartitionInfo().getPartitionKey());
608                         Set<FsWriterMetrics> metricsForPartition =
609                                 metricsByPartition.computeIfAbsent(parsedMetrics.getPartitionInfo(), k -> new HashSet<>());
610                         metricsForPartition.add(parsedMetrics);
611                     } catch (IOException e) {
612                         LOG.warn("Error parsing metrics from property {} - ignoring", (String) property.getValue());
613                     }
614                 }
615             }
616 
617             // no specific partitions - add null as a placeholder
618             if (!partitionFound) {
619                 partitionPaths.add(null);
620             }
621 
622             final String configBasedMetadata = getMetadataFromWorkUnitState(workUnitState);
623 
624             // Now update all metadata mergers with branch metadata + partition metrics
625             for (int branchId = 0; branchId < numBranches; branchId++) {
626                 for (String partition : partitionPaths) {
627                     PartitionIdentifier partitionIdentifier = new PartitionIdentifier(partition, branchId);
628                     final int branch = branchId;
629                     MetadataMerger<String> mdMerger = metadataMergers.computeIfAbsent(partitionIdentifier,
630                             k -> buildMetadataMergerForBranch(configBasedMetadata, branch,
631                                     getMetadataOutputFileForBranch(workUnitState, branch)));
632                     if (shouldPublishWriterMetadataForBranch(branchId)) {
633                         String md = getIntermediateMetadataFromState(workUnitState, branchId);
634                         mdMerger.update(md);
635                         Set<FsWriterMetrics> metricsForPartition =
636                                 metricsByPartition.getOrDefault(partitionIdentifier, Collections.emptySet());
637                         for (FsWriterMetrics metrics : metricsForPartition) {
638                             mdMerger.update(metrics);
639                         }
640                     }
641                 }
642             }
643         }
644     }
645 
646 
647     /**
648      * Publish metadata for each branch. We expect the metadata to be of String format and
649      * populated in either the WRITER_MERGED_METADATA_KEY state or the WRITER_METADATA_KEY configuration key.
650      */
651     @Override
652     public void publishMetadata(WorkUnitState state)
653             throws IOException {
654         publishMetadata(Collections.singleton(state));
655     }
656 
657     /**
658      * Publish metadata to a set of paths
659      */
660     private void publishMetadata(String metadataValue, int branchId, Path metadataOutputPath)
661             throws IOException {
662         try {
663             if (metadataOutputPath == null) {
664                 LOG.info("Metadata output path not set for branch " + String.valueOf(branchId) + ", not publishing.");
665                 return;
666             }
667 
668             if (metadataValue == null) {
669                 LOG.info("No metadata collected for branch " + String.valueOf(branchId) + ", not publishing.");
670                 return;
671             }
672 
673             FileSystem fs = this.metaDataWriterFileSystemByBranches.get(branchId);
674 
675             if (!fs.exists(metadataOutputPath.getParent())) {
676                 WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, metadataOutputPath, this.permissions.get(branchId), retrierConfig);
677             }
678 
679             //Delete the file if metadata already exists
680             if (fs.exists(metadataOutputPath)) {
681                 HadoopUtils.deletePath(fs, metadataOutputPath, false);
682             }
683             LOG.info("Writing metadata for branch " + String.valueOf(branchId) + " to " + metadataOutputPath.toString());
684             try (FSDataOutputStream outputStream = fs.create(metadataOutputPath)) {
685                 outputStream.write(metadataValue.getBytes(StandardCharsets.UTF_8));
686             }
687         } catch (IOException e) {
688             LOG.error("Metadata file is not generated: " + e, e);
689         }
690     }
691 
692     private String getMetadataFileNameForBranch(WorkUnitState state, int branchId) {
693         // Note: This doesn't follow the pattern elsewhere in Gobblin where we have branch specific config
694         // parameters! Leaving this way for backwards compatibility.
695         String filePrefix = state.getProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_FILE);
696         return ForkOperatorUtils.getPropertyNameForBranch(filePrefix, this.numBranches, branchId);
697     }
698 
699     private Path getMetadataOutputFileForBranch(WorkUnitState state, int branchId) {
700         String metaDataOutputDirStr = getMetadataOutputPathFromState(state, branchId);
701         String fileName = getMetadataFileNameForBranch(state, branchId);
702         if (metaDataOutputDirStr == null || fileName == null) {
703             return null;
704         }
705         return new Path(metaDataOutputDirStr, fileName);
706     }
707 
708     private String getUserSpecifiedOutputPathFromState(WorkUnitState state, int branchId) {
709         String outputDir = state.getProp(ForkOperatorUtils
710                 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR, this.numBranches, branchId));
711 
712         // An older version of this code did not get a branch specific PUBLISHER_METADATA_OUTPUT_DIR so fallback
713         // for compatibility's sake
714         if (outputDir == null && this.numBranches > 1) {
715             outputDir = state.getProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR);
716             if (outputDir != null) {
717                 LOG.warn("Branches are configured for this job but a per branch metadata output directory was not set;"
718                         + " is this intended?");
719             }
720         }
721 
722         return outputDir;
723     }
724 
725     private String getMetadataOutputPathFromState(WorkUnitState state, int branchId) {
726         String outputDir = getUserSpecifiedOutputPathFromState(state, branchId);
727 
728         // Just write out to the regular output path if a metadata specific path hasn't been provided
729         if (outputDir == null) {
730             String publisherOutputDir = getPublisherOutputDir(state, branchId).toString();
731             LOG.info("Missing metadata output directory path : " + ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR
732                     + " in the config; assuming outputPath " + publisherOutputDir);
733             return publisherOutputDir;
734         }
735 
736         return outputDir;
737     }
738 
739     /*
740      * Retrieve intermediate metadata (eg the metadata stored by each writer) for a given state and branch id.
741      */
742     private String getIntermediateMetadataFromState(WorkUnitState state, int branchId) {
743         return state.getProp(
744                 ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_METADATA_KEY, this.numBranches, branchId));
745     }
746 
747     /*
748      * Get the merged metadata given a workunit state and branch id. This method assumes
749      * all intermediate metadata has already been passed to the MetadataMerger.
750      *
751      * If metadata mergers are not configured, instead return the metadata from job config that was
752      * passed in by the user.
753      */
754     private String getMergedMetadataForPartitionAndBranch(String partitionId, int branchId) {
755         String mergedMd = null;
756         MetadataMerger<String> mergerForBranch = metadataMergers.get(new PartitionIdentifier(partitionId, branchId));
757         if (mergerForBranch != null) {
758             mergedMd = mergerForBranch.getMergedMetadata();
759             if (mergedMd == null) {
760                 LOG.warn("Metadata merger for branch {} returned null - bug in merger?", branchId);
761             }
762         }
763 
764         return mergedMd;
765     }
766 
767     private boolean shouldPublishWriterMetadataForBranch(int branchId) {
768         String keyName = ForkOperatorUtils
769                 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISH_WRITER_METADATA_KEY, this.numBranches, branchId);
770         return this.getState().getPropAsBoolean(keyName, false);
771     }
772 
773     /**
774      * Retrieve metadata from job state config
775      */
776     private String getMetadataFromWorkUnitState(WorkUnitState workUnitState) {
777         return workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_STR);
778     }
779 
780     /**
781      * The BaseDataPublisher relies on publishData() to create and clean-up the output directories, so data
782      * has to be published before the metadata can be.
783      */
784     @Override
785     protected boolean shouldPublishMetadataFirst() {
786         return false;
787     }
788 }