1 package org.wikimedia.gobblin.publisher; 2 3 import java.util.concurrent.locks.Lock; 4 5 import org.apache.gobblin.util.ParallelRunner; 6 import org.apache.hadoop.fs.FileAlreadyExistsException; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.slf4j.LoggerFactory; 10 import org.slf4j.Logger; 11 12 import com.google.common.util.concurrent.Striped; 13 14 15 public class ParallelRunnerWithTouch extends ParallelRunner { 16 17 private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunnerWithTouch.class); 18 19 private final Striped<Lock> locks = Striped.lazyWeakLock(2147483647); 20 21 public ParallelRunnerWithTouch(int threads, FileSystem fs) { 22 super(threads, fs); 23 } 24 25 public void touchPath(final Path path) { 26 this.submitCallable(() -> { 27 Lock lock = locks.get(path.toString()); 28 lock.lock(); 29 try { 30 if (getFs().isDirectory(path.getParent())) { 31 if (!getFs().exists(path)) { 32 getFs().create(path).close(); 33 } 34 } else { 35 LOGGER.warn("Failed to touch {} as parent is not an existing folder", path); 36 } 37 } catch (FileAlreadyExistsException fileExistsException) { 38 LOGGER.warn("Failed to touch {} as parent is not an existing folder", path, fileExistsException); 39 } finally { 40 lock.unlock(); 41 } 42 return null; 43 }, "Touch path " + path); 44 } 45 }