summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/spack/llnl/util/filesystem.py138
-rw-r--r--lib/spack/spack/test/llnl/util/filesystem.py31
2 files changed, 168 insertions, 1 deletions
diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py
index 8af2f28342..2b4cbe2424 100644
--- a/lib/spack/llnl/util/filesystem.py
+++ b/lib/spack/llnl/util/filesystem.py
@@ -5,18 +5,20 @@
import collections
import collections.abc
import errno
+import fnmatch
import glob
import hashlib
import itertools
import numbers
import os
+import posixpath
import re
import shutil
import stat
import sys
import tempfile
from contextlib import contextmanager
-from typing import Callable, List, Match, Optional, Tuple, Union
+from typing import Callable, Iterable, List, Match, Optional, Tuple, Union
from llnl.util import tty
from llnl.util.lang import dedupe, memoized
@@ -1671,6 +1673,38 @@ def fix_darwin_install_name(path):
break
+def find_first(root: str, files: Union[Iterable[str], str], bfs_depth: int = 2) -> Optional[str]:
+ """Find the first file matching a pattern.
+
+ The following
+
+ .. code-block:: console
+
+ $ find /usr -name 'abc*' -o -name 'def*' -quit
+
+ is equivalent to:
+
+ >>> find_first("/usr", ["abc*", "def*"])
+
+ Any glob pattern supported by fnmatch can be used.
+
+ The search order of this method is breadth-first over directories,
+ until depth bfs_depth, after which depth-first search is used.
+
+ Parameters:
+ root (str): The root directory to start searching from
+ files (str or Iterable): File pattern(s) to search for
+ bfs_depth (int): (advanced) parameter that specifies at which
+ depth to switch to depth-first search.
+
+ Returns:
+ str or None: The matching file or None when no file is found.
+ """
+ if isinstance(files, str):
+ files = [files]
+ return FindFirstFile(root, *files, bfs_depth=bfs_depth).find()
+
+
def find(root, files, recursive=True):
"""Search for ``files`` starting from the ``root`` directory.
@@ -2720,3 +2754,105 @@ def filesummary(path, print_bytes=16) -> Tuple[int, bytes]:
return size, short_contents
except OSError:
return 0, b""
+
+
+class FindFirstFile:
+ """Uses hybrid iterative deepening to locate the first matching
+ file. Up to depth ``bfs_depth`` it uses iterative deepening, which
+ mimics breadth-first with the same memory footprint as depth-first
+ search, after which it switches to ordinary depth-first search using
+ ``os.walk``."""
+
+ def __init__(self, root: str, *file_patterns: str, bfs_depth: int = 2):
+ """Create a small summary of the given file. Does not error
+ when file does not exist.
+
+ Args:
+ root (str): directory in which to recursively search
+ file_patterns (str): glob file patterns understood by fnmatch
+ bfs_depth (int): until this depth breadth-first traversal is used,
+ when no match is found, the mode is switched to depth-first search.
+ """
+ self.root = root
+ self.bfs_depth = bfs_depth
+ self.match: Callable
+
+ # normcase is trivial on posix
+ regex = re.compile("|".join(fnmatch.translate(os.path.normcase(p)) for p in file_patterns))
+
+ # On case sensitive filesystems match against normcase'd paths.
+ if os.path is posixpath:
+ self.match = regex.match
+ else:
+ self.match = lambda p: regex.match(os.path.normcase(p))
+
+ def find(self) -> Optional[str]:
+ """Run the file search
+
+ Returns:
+ str or None: path of the matching file
+ """
+ self.file = None
+
+ # First do iterative deepening (i.e. bfs through limited depth dfs)
+ for i in range(self.bfs_depth + 1):
+ if self._find_at_depth(self.root, i):
+ return self.file
+
+ # Then fall back to depth-first search
+ return self._find_dfs()
+
+ def _find_at_depth(self, path, max_depth, depth=0) -> bool:
+ """Returns True when done. Notice search can be done
+ either because a file was found, or because it recursed
+ through all directories."""
+ try:
+ entries = os.scandir(path)
+ except OSError:
+ return True
+
+ done = True
+
+ with entries:
+ # At max depth we look for matching files.
+ if depth == max_depth:
+ for f in entries:
+ # Exit on match
+ if self.match(f.name):
+ self.file = os.path.join(path, f.name)
+ return True
+
+ # is_dir should not require a stat call, so it's a good optimization.
+ if self._is_dir(f):
+ done = False
+ return done
+
+ # At lower depth only recurse into subdirs
+ for f in entries:
+ if not self._is_dir(f):
+ continue
+
+ # If any subdir is not fully traversed, we're not done yet.
+ if not self._find_at_depth(os.path.join(path, f.name), max_depth, depth + 1):
+ done = False
+
+ # Early exit when we've found something.
+ if self.file:
+ return True
+
+ return done
+
+ def _is_dir(self, f: os.DirEntry) -> bool:
+ """Returns True when f is dir we can enter (and not a symlink)."""
+ try:
+ return f.is_dir(follow_symlinks=False)
+ except OSError:
+ return False
+
+ def _find_dfs(self) -> Optional[str]:
+ """Returns match or None"""
+ for dirpath, _, filenames in os.walk(self.root):
+ for file in filenames:
+ if self.match(file):
+ return os.path.join(dirpath, file)
+ return None
diff --git a/lib/spack/spack/test/llnl/util/filesystem.py b/lib/spack/spack/test/llnl/util/filesystem.py
index 5e81b52c53..018137f406 100644
--- a/lib/spack/spack/test/llnl/util/filesystem.py
+++ b/lib/spack/spack/test/llnl/util/filesystem.py
@@ -871,3 +871,34 @@ def test_filesummary(tmpdir):
assert fs.filesummary(p, print_bytes=8) == (26, b"abcdefgh...stuvwxyz")
assert fs.filesummary(p, print_bytes=13) == (26, b"abcdefghijklmnopqrstuvwxyz")
assert fs.filesummary(p, print_bytes=100) == (26, b"abcdefghijklmnopqrstuvwxyz")
+
+
+@pytest.mark.parametrize("bfs_depth", [1, 2, 10])
+def test_find_first_file(tmpdir, bfs_depth):
+ # Create a structure: a/a/a/{file1,file2}, b/a, c/a, d/{a,file1}
+ tmpdir.join("a", "a", "a").ensure(dir=True)
+ tmpdir.join("b", "a").ensure(dir=True)
+ tmpdir.join("c", "a").ensure(dir=True)
+ tmpdir.join("d", "a").ensure(dir=True)
+ tmpdir.join("e").ensure(dir=True)
+
+ fs.touch(tmpdir.join("a", "a", "a", "file1"))
+ fs.touch(tmpdir.join("a", "a", "a", "file2"))
+ fs.touch(tmpdir.join("d", "file1"))
+
+ root = str(tmpdir)
+
+ # Iterative deepening: should find low-depth file1.
+ assert os.path.samefile(
+ fs.find_first(root, "file*", bfs_depth=bfs_depth), os.path.join(root, "d", "file1")
+ )
+
+ assert fs.find_first(root, "nonexisting", bfs_depth=bfs_depth) is None
+
+ assert os.path.samefile(
+ fs.find_first(root, ["nonexisting", "file2"], bfs_depth=bfs_depth),
+ os.path.join(root, "a", "a", "a", "file2"),
+ )
+
+ # Should find first dir
+ assert os.path.samefile(fs.find_first(root, "a", bfs_depth=bfs_depth), os.path.join(root, "a"))