ParallelRunnerWithTouch.java
package org.wikimedia.gobblin.publisher;
import java.util.concurrent.locks.Lock;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import com.google.common.util.concurrent.Striped;
public class ParallelRunnerWithTouch extends ParallelRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunnerWithTouch.class);
private final Striped<Lock> locks = Striped.lazyWeakLock(2147483647);
public ParallelRunnerWithTouch(int threads, FileSystem fs) {
super(threads, fs);
}
public void touchPath(final Path path) {
this.submitCallable(() -> {
Lock lock = locks.get(path.toString());
lock.lock();
try {
if (getFs().isDirectory(path.getParent())) {
if (!getFs().exists(path)) {
getFs().create(path).close();
}
} else {
LOGGER.warn("Failed to touch {} as parent is not an existing folder", path);
}
} catch (FileAlreadyExistsException fileExistsException) {
LOGGER.warn("Failed to touch {} as parent is not an existing folder", path, fileExistsException);
} finally {
lock.unlock();
}
return null;
}, "Touch path " + path);
}
}