1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.wikimedia.gobblin.copy;
18
19 import static org.apache.gobblin.util.retry.RetryerFactory.*;
20 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.nio.charset.StandardCharsets;
25 import java.util.*;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.commons.io.IOUtils;
29 import org.apache.gobblin.config.ConfigBuilder;
30 import org.apache.gobblin.configuration.ConfigurationKeys;
31 import org.apache.gobblin.configuration.SourceState;
32 import org.apache.gobblin.configuration.State;
33 import org.apache.gobblin.configuration.WorkUnitState;
34 import org.apache.gobblin.dataset.DatasetConstants;
35 import org.apache.gobblin.dataset.DatasetDescriptor;
36 import org.apache.gobblin.dataset.Descriptor;
37 import org.apache.gobblin.dataset.PartitionDescriptor;
38 import org.apache.gobblin.metadata.MetadataMerger;
39 import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
40 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
41 import org.apache.gobblin.publisher.SingleTaskDataPublisher;
42 import org.apache.gobblin.util.ForkOperatorUtils;
43 import org.apache.gobblin.util.HadoopUtils;
44 import org.apache.gobblin.util.ParallelRunner;
45 import org.apache.gobblin.util.WriterUtils;
46 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
47 import org.apache.gobblin.writer.FsDataWriter;
48 import org.apache.gobblin.writer.FsWriterMetrics;
49 import org.apache.gobblin.writer.PartitionIdentifier;
50 import org.apache.gobblin.writer.PartitionedDataWriter;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.fs.*;
53 import org.apache.hadoop.fs.permission.FsPermission;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import com.google.common.base.Optional;
58 import com.google.common.collect.*;
59 import com.google.common.io.Closer;
60 import com.typesafe.config.Config;
61 import com.typesafe.config.ConfigFactory;
62 import com.typesafe.config.ConfigRenderOptions;
63
64 import de.thetaphi.forbiddenapis.SuppressForbidden;
65 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @SuppressForbidden
99 @SuppressFBWarnings(justification = "Class copied from upstream Gobblin")
100 public class BaseDataPublisher extends SingleTaskDataPublisher {
101
102 private static final Logger LOG = LoggerFactory.getLogger(org.apache.gobblin.publisher.BaseDataPublisher.class);
103
104 protected final int numBranches;
105 protected final List<FileSystem> writerFileSystemByBranches;
106 protected final List<FileSystem> publisherFileSystemByBranches;
107 protected final List<FileSystem> metaDataWriterFileSystemByBranches;
108 protected final List<Optional<String>> publisherFinalDirOwnerGroupsByBranches;
109 protected final List<FsPermission> permissions;
110 protected final Closer closer;
111 protected final Closer parallelRunnerCloser;
112 protected final int parallelRunnerThreads;
113 protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap();
114 protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
115 protected final Optional<LineageInfo> lineageInfo;
116
117
118
119
120 protected final Map<PartitionIdentifier, MetadataMerger<String>> metadataMergers;
121 protected final boolean shouldRetry;
122
123 static final String DATA_PUBLISHER_RETRY_PREFIX = ConfigurationKeys.DATA_PUBLISHER_PREFIX + ".retry.";
124 static final String PUBLISH_RETRY_ENABLED = DATA_PUBLISHER_RETRY_PREFIX + "enabled";
125
126 static final Config PUBLISH_RETRY_DEFAULTS;
127 protected final Config retrierConfig;
128
129 static {
130 Map<String, Object> configMap =
131 ImmutableMap.<String, Object>builder()
132 .put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L))
133 .put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L))
134 .put(RETRY_MULTIPLIER, 2L)
135 .put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
136 .build();
137 PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
138 };
139
140 public BaseDataPublisher(State state)
141 throws IOException {
142 super(state);
143 this.closer = Closer.create();
144 Configuration conf = new Configuration();
145
146
147 for (String key : this.getState().getPropertyNames()) {
148 conf.set(key, this.getState().getProp(key));
149 }
150
151
152 if (state instanceof SourceState) {
153 lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker());
154 } else if (state instanceof WorkUnitState) {
155 lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable());
156 } else {
157 lineageInfo = Optional.absent();
158 }
159
160 this.numBranches = this.getState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
161 this.shouldRetry = this.getState().getPropAsBoolean(PUBLISH_RETRY_ENABLED, false);
162
163 this.writerFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
164 this.publisherFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
165 this.metaDataWriterFileSystemByBranches = Lists.newArrayListWithCapacity(this.numBranches);
166 this.publisherFinalDirOwnerGroupsByBranches = Lists.newArrayListWithCapacity(this.numBranches);
167 this.permissions = Lists.newArrayListWithCapacity(this.numBranches);
168 this.metadataMergers = new HashMap<>();
169
170
171 for (int i = 0; i < this.numBranches; i++) {
172 URI writerUri = URI.create(this.getState().getProp(
173 ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, this.numBranches, i),
174 ConfigurationKeys.LOCAL_FS_URI));
175 this.writerFileSystemByBranches.add(FileSystem.get(writerUri, conf));
176
177 URI publisherUri = URI.create(this.getState().getProp(ForkOperatorUtils
178 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FILE_SYSTEM_URI, this.numBranches, i),
179 writerUri.toString()));
180 this.publisherFileSystemByBranches.add(FileSystem.get(publisherUri, conf));
181 this.metaDataWriterFileSystemByBranches.add(FileSystem.get(publisherUri, conf));
182
183
184 this.publisherFinalDirOwnerGroupsByBranches.add(Optional.fromNullable(this.getState().getProp(ForkOperatorUtils
185 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR_GROUP, this.numBranches, i))));
186
187
188
189
190 this.permissions.add(new FsPermission(state.getPropAsShortWithRadix(
191 ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_PERMISSIONS, this.numBranches, i),
192 FsPermission.getDefault().toShort(), ConfigurationKeys.PERMISSION_PARSING_RADIX)));
193 }
194
195 if (this.shouldRetry) {
196 this.retrierConfig = ConfigBuilder.create()
197 .loadProps(this.getState().getProperties(), DATA_PUBLISHER_RETRY_PREFIX)
198 .build()
199 .withFallback(PUBLISH_RETRY_DEFAULTS);
200 LOG.info("Retry enabled for publish with config : "+ retrierConfig.root().render(ConfigRenderOptions.concise()));
201
202 }else {
203 LOG.info("Retry disabled for publish.");
204 this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
205 }
206
207
208 this.parallelRunnerThreads =
209 state.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
210 this.parallelRunnerCloser = Closer.create();
211 }
212
213 private MetadataMerger<String> buildMetadataMergerForBranch(String metadataFromConfig, int branchId,
214 Path existingMetadataPath) {
215
216
217 if (!shouldPublishWriterMetadataForBranch(branchId)) {
218 return new StaticStringMetadataMerger(metadataFromConfig);
219 }
220
221 String keyName = ForkOperatorUtils
222 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_KEY, this.numBranches,
223 branchId);
224 String className =
225 this.getState().getProp(keyName, ConfigurationKeys.DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_DEFAULT);
226
227 try {
228 Class<?> mdClass = Class.forName(className);
229
230
231
232 @SuppressWarnings("unchecked")
233 Object merger = GobblinConstructorUtils
234 .invokeFirstConstructor(mdClass, Collections.<Object>singletonList(this.getState().getProperties()),
235 Collections.<Object>emptyList());
236
237 try {
238 @SuppressWarnings("unchecked")
239 MetadataMerger<String> casted = (MetadataMerger<String>) merger;
240
241
242 String existingMetadata = loadExistingMetadata(existingMetadataPath, branchId);
243 if (existingMetadata != null) {
244 casted.update(existingMetadata);
245 }
246
247
248 if (metadataFromConfig != null) {
249 casted.update(metadataFromConfig);
250 }
251 return casted;
252 } catch (ClassCastException e) {
253 throw new IllegalArgumentException(className + " does not implement the MetadataMerger interface", e);
254 }
255 } catch (ClassNotFoundException e) {
256 throw new IllegalArgumentException("Specified metadata merger class " + className + " not found!", e);
257 } catch (ReflectiveOperationException e) {
258 throw new IllegalArgumentException("Error building merger class " + className, e);
259 }
260 }
261
262
263
264
265 private String loadExistingMetadata(Path metadataFilename, int branchId) {
266 try {
267 FileSystem fsForBranch = writerFileSystemByBranches.get(branchId);
268 if (!fsForBranch.exists(metadataFilename)) {
269 return null;
270 }
271 FSDataInputStream existingMetadata = writerFileSystemByBranches.get(branchId).open(metadataFilename);
272 return IOUtils.toString(existingMetadata, StandardCharsets.UTF_8);
273 } catch (IOException e) {
274 LOG.warn("IOException {} while trying to read existing metadata {} - treating as null", e.getMessage(),
275 metadataFilename.toString());
276 return null;
277 }
278 }
279
280 @Override
281 public void initialize()
282 throws IOException {
283
284 }
285
286 @Override
287 public void close()
288 throws IOException {
289 try {
290 for (Path path : this.publisherOutputDirs) {
291 this.state.appendToSetProp(ConfigurationKeys.PUBLISHER_DIRS, path.toString());
292 }
293 this.state.setProp(ConfigurationKeys.PUBLISHER_LATEST_FILE_ARRIVAL_TIMESTAMP, System.currentTimeMillis());
294 } finally {
295 this.closer.close();
296 }
297 }
298
299 private void addLineageInfo(WorkUnitState state, int branchId) {
300 if (!this.lineageInfo.isPresent()) {
301 LOG.info("Will not add lineage info");
302 return;
303 }
304
305
306 DatasetDescriptor datasetDescriptor = createDestinationDescriptor(state, branchId);
307
308 List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, branchId);
309 List<Descriptor> descriptors = new ArrayList<>();
310 if (partitions.size() == 0) {
311
312 descriptors.add(datasetDescriptor);
313 } else {
314
315 for (PartitionDescriptor partition : partitions) {
316 descriptors.add(partition.copyWithNewDataset(datasetDescriptor));
317 }
318 }
319 this.lineageInfo.get().putDestination(descriptors, branchId, state);
320 }
321
322
323
324
325 protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
326 Path publisherOutputDir = getPublisherOutputDir(state, branchId);
327 FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
328 DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), fs.getUri(), publisherOutputDir.toString());
329 destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
330 destination.addMetadata(DatasetConstants.BRANCH, String.valueOf(branchId));
331 return destination;
332 }
333
334 @Override
335 public void publishData(WorkUnitState state)
336 throws IOException {
337 for (int branchId = 0; branchId < this.numBranches; branchId++) {
338 publishSingleTaskData(state, branchId);
339 }
340 this.parallelRunnerCloser.close();
341 }
342
343
344
345
346
347 private void publishSingleTaskData(WorkUnitState state, int branchId)
348 throws IOException {
349 publishData(state, branchId, true, new HashSet<Path>());
350 addLineageInfo(state, branchId);
351 }
352
353 @Override
354 public void publishData(Collection<? extends WorkUnitState> states)
355 throws IOException {
356
357
358
359 Set<Path> writerOutputPathsMoved = Sets.newHashSet();
360
361 for (WorkUnitState workUnitState : states) {
362 for (int branchId = 0; branchId < this.numBranches; branchId++) {
363 publishMultiTaskData(workUnitState, branchId, writerOutputPathsMoved);
364 }
365 }
366
367 this.parallelRunnerCloser.close();
368 }
369
370
371
372
373
374 protected void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved)
375 throws IOException {
376 publishData(state, branchId, false, writerOutputPathsMoved);
377 addLineageInfo(state, branchId);
378 }
379
380 protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
381 Set<Path> writerOutputPathsMoved)
382 throws IOException {
383
384 ParallelRunner parallelRunner = this.getParallelRunner(this.writerFileSystemByBranches.get(branchId));
385
386
387 Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
388
389 if (!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) {
390 LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId()));
391 return;
392 }
393
394
395
396 Path publisherOutputDir = getPublisherOutputDir(state, branchId);
397
398 if (publishSingleTaskData) {
399
400 WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
401 this.permissions.get(branchId), retrierConfig);
402 addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
403 } else {
404 if (writerOutputPathsMoved.contains(writerOutputDir)) {
405
406
407 return;
408 }
409
410 if (this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
411
412
413 boolean replaceFinalOutputDir = this.getState().getPropAsBoolean(ForkOperatorUtils
414 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, branchId));
415
416
417 if (!replaceFinalOutputDir) {
418 addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
419 writerOutputPathsMoved.add(writerOutputDir);
420 return;
421 }
422
423
424 LOG.info("Deleting publisher output dir " + publisherOutputDir);
425 this.publisherFileSystemByBranches.get(branchId).delete(publisherOutputDir, true);
426 }
427
428 addWriterOutputToNewDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
429 writerOutputPathsMoved.add(writerOutputDir);
430 }
431 }
432
433
434
435
436
437
438
439
440
441
442
443
444
445 protected Path getPublisherOutputDir(WorkUnitState workUnitState, int branchId) {
446 return WriterUtils.getDataPublisherFinalDir(workUnitState, this.numBranches, branchId);
447 }
448
449 protected void addSingleTaskWriterOutputToExistingDir(Path writerOutputDir, Path publisherOutputDir,
450 WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner)
451 throws IOException {
452 String outputFilePropName = ForkOperatorUtils
453 .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, branchId);
454
455 if (!workUnitState.contains(outputFilePropName)) {
456 LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data.");
457 return;
458 }
459
460 Iterable<String> taskOutputFiles = workUnitState.getPropAsSet(outputFilePropName);
461 for (String taskOutputFile : taskOutputFiles) {
462 Path taskOutputPath = new Path(taskOutputFile);
463 if (!this.writerFileSystemByBranches.get(branchId).exists(taskOutputPath)) {
464 LOG.warn("Task output file " + taskOutputFile + " doesn't exist.");
465 continue;
466 }
467 String pathSuffix = taskOutputFile
468 .substring(taskOutputFile.indexOf(writerOutputDir.toString()) + writerOutputDir.toString().length() + 1);
469 Path publisherOutputPath = new Path(publisherOutputDir, pathSuffix);
470 WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
471 publisherOutputPath.getParent(), this.permissions.get(branchId), retrierConfig);
472
473 movePath(parallelRunner, workUnitState, taskOutputPath, publisherOutputPath, branchId);
474 }
475 }
476
477 protected void addWriterOutputToNewDir(Path writerOutput, Path publisherOutput,
478 WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner)
479 throws IOException {
480
481 WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
482 publisherOutput.getParent(), this.permissions.get(branchId), retrierConfig);
483 movePath(parallelRunner, state, writerOutput, publisherOutput, branchId);
484 }
485
486 protected void addWriterOutputToExistingDir(Path writerOutputDir, Path publisherOutputDir,
487 WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner)
488 throws IOException {
489 boolean preserveFileName = workUnitState.getPropAsBoolean(ForkOperatorUtils
490 .getPropertyNameForBranch(ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, branchId),
491 false);
492
493 for (FileStatus status : this.writerFileSystemByBranches.get(branchId).listStatus(writerOutputDir)) {
494
495
496 Path finalOutputPath = preserveFileName ? new Path(publisherOutputDir, workUnitState.getProp(ForkOperatorUtils
497 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, branchId)))
498 : new Path(publisherOutputDir, status.getPath().getName());
499
500 movePath(parallelRunner, workUnitState, status.getPath(), finalOutputPath, branchId);
501 }
502 }
503
504 protected void movePath(ParallelRunner parallelRunner, State state, Path src, Path dst, int branchId)
505 throws IOException {
506 LOG.info(String.format("Moving %s to %s", src, dst));
507 boolean overwrite = state.getPropAsBoolean(ConfigurationKeys.DATA_PUBLISHER_OVERWRITE_ENABLED, false);
508 this.publisherOutputDirs.addAll(recordPublisherOutputDirs(src, dst, branchId));
509 parallelRunner.movePath(src, this.publisherFileSystemByBranches.get(branchId), dst, overwrite,
510 this.publisherFinalDirOwnerGroupsByBranches.get(branchId));
511 }
512
513 protected Collection<Path> recordPublisherOutputDirs(Path src, Path dst, int branchId)
514 throws IOException {
515
516
517
518 if (this.writerFileSystemByBranches.get(branchId).getFileStatus(src).isDirectory()) {
519 return ImmutableList.<Path>of(dst);
520 }
521 return ImmutableList.<Path>of(dst.getParent());
522 }
523
524 private ParallelRunner getParallelRunner(FileSystem fs) {
525 String uri = fs.getUri().toString();
526 if (!this.parallelRunners.containsKey(uri)) {
527 this.parallelRunners
528 .put(uri, this.parallelRunnerCloser.register(new ParallelRunner(this.parallelRunnerThreads, fs)));
529 }
530 return this.parallelRunners.get(uri);
531 }
532
533
534
535
536
537
538 @Override
539 public void publishMetadata(Collection<? extends WorkUnitState> states)
540 throws IOException {
541 Set<String> partitions = new HashSet<>();
542
543
544 mergeMetadataAndCollectPartitionNames(states, partitions);
545 partitions.removeIf(Objects::isNull);
546
547
548
549
550
551
552 WorkUnitState anyState = states.iterator().next();
553 for (int branchId = 0; branchId < numBranches; branchId++) {
554 String mdOutputPath = getMetadataOutputPathFromState(anyState, branchId);
555 String userSpecifiedPath = getUserSpecifiedOutputPathFromState(anyState, branchId);
556
557 if (partitions.isEmpty() || userSpecifiedPath != null) {
558 publishMetadata(getMergedMetadataForPartitionAndBranch(null, branchId),
559 branchId,
560 getMetadataOutputFileForBranch(anyState, branchId));
561 } else {
562 String metadataFilename = getMetadataFileNameForBranch(anyState, branchId);
563 if (mdOutputPath == null || metadataFilename == null) {
564 LOG.info("Metadata filename not set for branch " + String.valueOf(branchId) + ": not publishing metadata.");
565 continue;
566 }
567
568 for (String partition : partitions) {
569 publishMetadata(getMergedMetadataForPartitionAndBranch(partition, branchId),
570 branchId,
571 new Path(new Path(mdOutputPath, partition), metadataFilename));
572 }
573 }
574 }
575 }
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591 private void mergeMetadataAndCollectPartitionNames(Collection<? extends WorkUnitState> states,
592 Set<String> partitionPaths) {
593
594 for (WorkUnitState workUnitState : states) {
595
596
597
598 Map<PartitionIdentifier, Set<FsWriterMetrics>> metricsByPartition = new HashMap<>();
599 boolean partitionFound = false;
600 for (Map.Entry<Object, Object> property : workUnitState.getProperties().entrySet()) {
601 if (((String) property.getKey()).startsWith(ConfigurationKeys.WRITER_PARTITION_PATH_KEY)) {
602 partitionPaths.add((String) property.getValue());
603 partitionFound = true;
604 } else if (((String) property.getKey()).startsWith(FsDataWriter.FS_WRITER_METRICS_KEY)) {
605 try {
606 FsWriterMetrics parsedMetrics = FsWriterMetrics.fromJson((String) property.getValue());
607 partitionPaths.add(parsedMetrics.getPartitionInfo().getPartitionKey());
608 Set<FsWriterMetrics> metricsForPartition =
609 metricsByPartition.computeIfAbsent(parsedMetrics.getPartitionInfo(), k -> new HashSet<>());
610 metricsForPartition.add(parsedMetrics);
611 } catch (IOException e) {
612 LOG.warn("Error parsing metrics from property {} - ignoring", (String) property.getValue());
613 }
614 }
615 }
616
617
618 if (!partitionFound) {
619 partitionPaths.add(null);
620 }
621
622 final String configBasedMetadata = getMetadataFromWorkUnitState(workUnitState);
623
624
625 for (int branchId = 0; branchId < numBranches; branchId++) {
626 for (String partition : partitionPaths) {
627 PartitionIdentifier partitionIdentifier = new PartitionIdentifier(partition, branchId);
628 final int branch = branchId;
629 MetadataMerger<String> mdMerger = metadataMergers.computeIfAbsent(partitionIdentifier,
630 k -> buildMetadataMergerForBranch(configBasedMetadata, branch,
631 getMetadataOutputFileForBranch(workUnitState, branch)));
632 if (shouldPublishWriterMetadataForBranch(branchId)) {
633 String md = getIntermediateMetadataFromState(workUnitState, branchId);
634 mdMerger.update(md);
635 Set<FsWriterMetrics> metricsForPartition =
636 metricsByPartition.getOrDefault(partitionIdentifier, Collections.emptySet());
637 for (FsWriterMetrics metrics : metricsForPartition) {
638 mdMerger.update(metrics);
639 }
640 }
641 }
642 }
643 }
644 }
645
646
647
648
649
650
651 @Override
652 public void publishMetadata(WorkUnitState state)
653 throws IOException {
654 publishMetadata(Collections.singleton(state));
655 }
656
657
658
659
660 private void publishMetadata(String metadataValue, int branchId, Path metadataOutputPath)
661 throws IOException {
662 try {
663 if (metadataOutputPath == null) {
664 LOG.info("Metadata output path not set for branch " + String.valueOf(branchId) + ", not publishing.");
665 return;
666 }
667
668 if (metadataValue == null) {
669 LOG.info("No metadata collected for branch " + String.valueOf(branchId) + ", not publishing.");
670 return;
671 }
672
673 FileSystem fs = this.metaDataWriterFileSystemByBranches.get(branchId);
674
675 if (!fs.exists(metadataOutputPath.getParent())) {
676 WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, metadataOutputPath, this.permissions.get(branchId), retrierConfig);
677 }
678
679
680 if (fs.exists(metadataOutputPath)) {
681 HadoopUtils.deletePath(fs, metadataOutputPath, false);
682 }
683 LOG.info("Writing metadata for branch " + String.valueOf(branchId) + " to " + metadataOutputPath.toString());
684 try (FSDataOutputStream outputStream = fs.create(metadataOutputPath)) {
685 outputStream.write(metadataValue.getBytes(StandardCharsets.UTF_8));
686 }
687 } catch (IOException e) {
688 LOG.error("Metadata file is not generated: " + e, e);
689 }
690 }
691
692 private String getMetadataFileNameForBranch(WorkUnitState state, int branchId) {
693
694
695 String filePrefix = state.getProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_FILE);
696 return ForkOperatorUtils.getPropertyNameForBranch(filePrefix, this.numBranches, branchId);
697 }
698
699 private Path getMetadataOutputFileForBranch(WorkUnitState state, int branchId) {
700 String metaDataOutputDirStr = getMetadataOutputPathFromState(state, branchId);
701 String fileName = getMetadataFileNameForBranch(state, branchId);
702 if (metaDataOutputDirStr == null || fileName == null) {
703 return null;
704 }
705 return new Path(metaDataOutputDirStr, fileName);
706 }
707
708 private String getUserSpecifiedOutputPathFromState(WorkUnitState state, int branchId) {
709 String outputDir = state.getProp(ForkOperatorUtils
710 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR, this.numBranches, branchId));
711
712
713
714 if (outputDir == null && this.numBranches > 1) {
715 outputDir = state.getProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR);
716 if (outputDir != null) {
717 LOG.warn("Branches are configured for this job but a per branch metadata output directory was not set;"
718 + " is this intended?");
719 }
720 }
721
722 return outputDir;
723 }
724
725 private String getMetadataOutputPathFromState(WorkUnitState state, int branchId) {
726 String outputDir = getUserSpecifiedOutputPathFromState(state, branchId);
727
728
729 if (outputDir == null) {
730 String publisherOutputDir = getPublisherOutputDir(state, branchId).toString();
731 LOG.info("Missing metadata output directory path : " + ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR
732 + " in the config; assuming outputPath " + publisherOutputDir);
733 return publisherOutputDir;
734 }
735
736 return outputDir;
737 }
738
739
740
741
742 private String getIntermediateMetadataFromState(WorkUnitState state, int branchId) {
743 return state.getProp(
744 ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_METADATA_KEY, this.numBranches, branchId));
745 }
746
747
748
749
750
751
752
753
754 private String getMergedMetadataForPartitionAndBranch(String partitionId, int branchId) {
755 String mergedMd = null;
756 MetadataMerger<String> mergerForBranch = metadataMergers.get(new PartitionIdentifier(partitionId, branchId));
757 if (mergerForBranch != null) {
758 mergedMd = mergerForBranch.getMergedMetadata();
759 if (mergedMd == null) {
760 LOG.warn("Metadata merger for branch {} returned null - bug in merger?", branchId);
761 }
762 }
763
764 return mergedMd;
765 }
766
767 private boolean shouldPublishWriterMetadataForBranch(int branchId) {
768 String keyName = ForkOperatorUtils
769 .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISH_WRITER_METADATA_KEY, this.numBranches, branchId);
770 return this.getState().getPropAsBoolean(keyName, false);
771 }
772
773
774
775
776 private String getMetadataFromWorkUnitState(WorkUnitState workUnitState) {
777 return workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_STR);
778 }
779
780
781
782
783
784 @Override
785 protected boolean shouldPublishMetadataFirst() {
786 return false;
787 }
788 }