1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.wikimedia.gobblin.publisher;
19
20 import java.io.IOException;
21 import java.net.URI;
22 import java.util.AbstractMap;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.TreeSet;
32 import java.util.stream.Collectors;
33
34 import org.apache.gobblin.configuration.ConfigurationKeys;
35 import org.apache.gobblin.configuration.State;
36 import org.apache.gobblin.configuration.WorkUnitState;
37 import org.apache.gobblin.publisher.DataPublisher;
38 import org.apache.gobblin.util.ParallelRunner;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42
43 import com.google.common.base.Preconditions;
44 import com.google.common.collect.Sets;
45 import com.google.common.io.Closer;
46
47 import lombok.extern.slf4j.Slf4j;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @Slf4j
65 public class TimePartitionedFlagDataPublisher extends DataPublisher {
66
67 public static final String PUBLISHER_PUBLISHED_FLAGS_KEY = ConfigurationKeys.DATA_PUBLISHER_PREFIX + ".published.flags";
68
69 public static final String PUBLISHER_TIME_PARTITION_FLAG_KEY = ConfigurationKeys.DATA_PUBLISHER_PREFIX + ".timepartition.flag";
70 public static final String DEFAULT_PUBLISHER_TIME_PARTITION_FLAG = "_IMPORTED";
71
72 private final String flag;
73
74 private final Closer closer = Closer.create();
75 private final ParallelRunnerWithTouch parallelRunner;
76
77 private final Set<Path> publishedFlags = Sets.newHashSet();
78
79 public TimePartitionedFlagDataPublisher(State state) throws IOException {
80 super(state);
81
82 int parallelRunnerThreads = state.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
83 this.flag = state.getProp(PUBLISHER_TIME_PARTITION_FLAG_KEY, DEFAULT_PUBLISHER_TIME_PARTITION_FLAG);
84 log.info("Time-partition flag for dataset {} is: {}", state.getProp(ConfigurationKeys.DATASET_URN_KEY), flag);
85
86 FileSystem publisherFs = getPublisherFileSystem(state);
87
88 publisherFs.setWriteChecksum(false);
89
90 this.parallelRunner = new ParallelRunnerWithTouch(parallelRunnerThreads, publisherFs);
91 this.closer.register(this.parallelRunner);
92 }
93
94
95 public static FileSystem getPublisherFileSystem(State state) throws IOException {
96 Configuration conf = new Configuration();
97
98
99 for (String key : state.getPropertyNames()) {
100 conf.set(key, state.getProp(key));
101 }
102 URI writerUri = URI.create(state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI));
103 URI publisherUri = URI.create(state.getProp(ConfigurationKeys.DATA_PUBLISHER_FILE_SYSTEM_URI, writerUri.toString()));
104 return FileSystem.get(publisherUri, conf);
105 }
106
107 @Deprecated
108 @Override
109 public void initialize() {
110
111 }
112
113 @Override
114 public void close() throws IOException {
115 try {
116 for (Path path : this.publishedFlags) {
117 this.state.appendToSetProp(PUBLISHER_PUBLISHED_FLAGS_KEY, path.toString());
118 }
119 } finally {
120 this.closer.close();
121 }
122 }
123
124
125
126
127
128 private Map<String, Set<String>> getPartitionsToFlagByTable(Collection<? extends WorkUnitState> states) {
129 List<String> tablesNotToFlag = new ArrayList<>();
130
131
132
133
134 Map<String, Map.Entry<String, Set<String>>> tablesTimePartitions = new HashMap<>();
135
136
137 for (WorkUnitState workUnitState : states) {
138 Preconditions.checkArgument(workUnitState.contains(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY));
139
140 String tableName = workUnitState.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY);
141
142 if (workUnitState.getWorkingState() == WorkUnitState.WorkingState.FAILED) {
143 tablesTimePartitions.remove(tableName);
144 tablesNotToFlag.add(tableName);
145 log.debug(" Marking table {} as NOT to be flagged due to failed tasks", tableName);
146 }
147
148
149 if (workUnitState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED &&
150 workUnitState.getPropAsInt(ConfigurationKeys.WRITER_RECORDS_WRITTEN) > 0 &&
151 !tablesNotToFlag.contains(tableName)) {
152
153
154 TreeSet<String> writtenPartitions = new TreeSet<>(Collections.reverseOrder());
155 for (Map.Entry<Object, Object> property : workUnitState.getProperties().entrySet()) {
156 if (((String) property.getKey()).startsWith(ConfigurationKeys.WRITER_PARTITION_PATH_KEY)) {
157 writtenPartitions.add((String) property.getValue());
158 }
159 }
160
161
162
163 String writtenMax = writtenPartitions.pollFirst();
164
165 if (null != writtenMax) {
166
167 if (!tablesTimePartitions.containsKey(tableName)) {
168 tablesTimePartitions.put(tableName, new AbstractMap.SimpleEntry<>(writtenMax, new HashSet<>(writtenPartitions)));
169 } else {
170 String tableMax = tablesTimePartitions.get(tableName).getKey();
171 Set<String> tablePartitionsToFlag = tablesTimePartitions.get(tableName).getValue();
172
173
174 tablePartitionsToFlag.addAll(writtenPartitions);
175
176 final String newMax = (writtenMax.compareTo(tableMax) < 0) ? writtenMax : tableMax;
177
178 tablePartitionsToFlag.removeIf(p -> p.compareTo(newMax) >= 0);
179
180 tablesTimePartitions.put(tableName, new AbstractMap.SimpleEntry<>(newMax, tablePartitionsToFlag));
181 }
182 }
183 }
184 }
185 Map<String, Set<String>> result = new HashMap<>();
186 tablesTimePartitions.forEach((k, v) -> result.put(k, v.getValue()));
187
188 return result;
189 }
190
191
192
193
194
195
196 private void writeFlags(Map<String, Set<String>> tablesPartitions) {
197 String publisherFinalBaseDir = state.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
198
199 Set<String> publishedDirs = state.getPropAsSet(ConfigurationKeys.PUBLISHER_DIRS, "");
200
201 for (Map.Entry<String, Set<String>> tableAndPartitions: tablesPartitions.entrySet()) {
202 String table = tableAndPartitions.getKey();
203 for (String partitionToFlag: tableAndPartitions.getValue()) {
204 String pathToFlag = publisherFinalBaseDir + "/" + table + "/" + partitionToFlag;
205 if (publishedDirs.contains(pathToFlag)) {
206 Path flagPath = new Path(pathToFlag, flag);
207 parallelRunner.touchPath(flagPath);
208 publishedFlags.add(flagPath);
209 } else {
210 log.warn("Path-to-flag {} is not present in the list of published-directories", pathToFlag);
211 }
212 }
213 }
214 }
215
216 @Override
217 public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
218
219 log.debug("Compute time-partitions to flag");
220 Map<String, Set<String>> tablesPartitions = getPartitionsToFlagByTable(states);
221
222 log.debug("Write flags in time-partition folders");
223 writeFlags(tablesPartitions);
224
225 List<String> publishedFlagsString = this.publishedFlags.stream().map(Path::toString).collect(Collectors.toList());
226 log.info("{} time-partition flags published: {}", this.publishedFlags.size(), publishedFlagsString);
227 }
228
229 @Override
230 public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
231
232 }
233
234 }