View Javadoc
1   package org.wikimedia.gobblin.publisher;
2   
3   import java.util.concurrent.locks.Lock;
4   
5   import org.apache.gobblin.util.ParallelRunner;
6   import org.apache.hadoop.fs.FileAlreadyExistsException;
7   import org.apache.hadoop.fs.FileSystem;
8   import org.apache.hadoop.fs.Path;
9   import org.slf4j.LoggerFactory;
10  import org.slf4j.Logger;
11  
12  import com.google.common.util.concurrent.Striped;
13  
14  
15  public class ParallelRunnerWithTouch extends ParallelRunner {
16  
17      private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunnerWithTouch.class);
18  
19      private final Striped<Lock> locks = Striped.lazyWeakLock(2147483647);
20  
21      public ParallelRunnerWithTouch(int threads, FileSystem fs) {
22          super(threads, fs);
23      }
24  
25      public void touchPath(final Path path) {
26          this.submitCallable(() -> {
27              Lock lock = locks.get(path.toString());
28              lock.lock();
29              try {
30                  if (getFs().isDirectory(path.getParent())) {
31                      if (!getFs().exists(path)) {
32                          getFs().create(path).close();
33                      }
34                  } else {
35                      LOGGER.warn("Failed to touch {} as parent is not an existing folder", path);
36                  }
37              } catch (FileAlreadyExistsException fileExistsException) {
38                  LOGGER.warn("Failed to touch {} as parent is not an existing folder", path, fileExistsException);
39              } finally {
40                  lock.unlock();
41              }
42              return null;
43          }, "Touch path " + path);
44      }
45  }