# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) import os import sys import pytest import llnl.util.symlink from llnl.util.filesystem import mkdirp, touchp, visit_directory_tree, working_dir from llnl.util.link_tree import DestinationMergeVisitor, LinkTree, SourceMergeVisitor from llnl.util.symlink import _windows_can_symlink, islink, readlink, symlink from spack.stage import Stage @pytest.fixture() def stage(): """Creates a stage with the directory structure for the tests.""" s = Stage("link-tree-test") s.create() with working_dir(s.path): touchp("source/1") touchp("source/a/b/2") touchp("source/a/b/3") touchp("source/c/4") touchp("source/c/d/5") touchp("source/c/d/6") touchp("source/c/d/e/7") yield s s.destroy() @pytest.fixture() def link_tree(stage): """Return a properly initialized LinkTree instance.""" source_path = os.path.join(stage.path, "source") return LinkTree(source_path) def check_file_link(filename, expected_target): assert os.path.isfile(filename) assert islink(filename) if sys.platform != "win32" or llnl.util.symlink._windows_can_symlink(): assert os.path.abspath(os.path.realpath(filename)) == os.path.abspath(expected_target) def check_dir(filename): assert os.path.isdir(filename) @pytest.mark.parametrize("run_as_root", [True, False]) def test_merge_to_new_directory(stage, link_tree, monkeypatch, run_as_root): if sys.platform != "win32": if run_as_root: pass else: pytest.skip("Skipping duplicate test.") elif _windows_can_symlink() or not run_as_root: monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root) else: # Skip if trying to run as dev-mode without having dev-mode. pytest.skip("Skipping portion of test which required dev-mode privileges.") with working_dir(stage.path): link_tree.merge("dest") files = [ ("dest/1", "source/1"), ("dest/a/b/2", "source/a/b/2"), ("dest/a/b/3", "source/a/b/3"), ("dest/c/4", "source/c/4"), ("dest/c/d/5", "source/c/d/5"), ("dest/c/d/6", "source/c/d/6"), ("dest/c/d/e/7", "source/c/d/e/7"), ] for dest, source in files: check_file_link(dest, source) assert os.path.isabs(readlink(dest)) link_tree.unmerge("dest") assert not os.path.exists("dest") @pytest.mark.parametrize("run_as_root", [True, False]) def test_merge_to_new_directory_relative(stage, link_tree, monkeypatch, run_as_root): if sys.platform != "win32": if run_as_root: pass else: pytest.skip("Skipping duplicate test.") elif _windows_can_symlink() or not run_as_root: monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root) else: # Skip if trying to run as dev-mode without having dev-mode. pytest.skip("Skipping portion of test which required dev-mode privileges.") with working_dir(stage.path): link_tree.merge("dest", relative=True) files = [ ("dest/1", "source/1"), ("dest/a/b/2", "source/a/b/2"), ("dest/a/b/3", "source/a/b/3"), ("dest/c/4", "source/c/4"), ("dest/c/d/5", "source/c/d/5"), ("dest/c/d/6", "source/c/d/6"), ("dest/c/d/e/7", "source/c/d/e/7"), ] for dest, source in files: check_file_link(dest, source) # Hard links/junctions are inherently absolute. if sys.platform != "win32" or run_as_root: assert not os.path.isabs(readlink(dest)) link_tree.unmerge("dest") assert not os.path.exists("dest") @pytest.mark.parametrize("run_as_root", [True, False]) def test_merge_to_existing_directory(stage, link_tree, monkeypatch, run_as_root): if sys.platform != "win32": if run_as_root: pass else: pytest.skip("Skipping duplicate test.") elif _windows_can_symlink() or not run_as_root: monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root) else: # Skip if trying to run as dev-mode without having dev-mode. pytest.skip("Skipping portion of test which required dev-mode privileges.") with working_dir(stage.path): touchp("dest/x") touchp("dest/a/b/y") link_tree.merge("dest") files = [ ("dest/1", "source/1"), ("dest/a/b/2", "source/a/b/2"), ("dest/a/b/3", "source/a/b/3"), ("dest/c/4", "source/c/4"), ("dest/c/d/5", "source/c/d/5"), ("dest/c/d/6", "source/c/d/6"), ("dest/c/d/e/7", "source/c/d/e/7"), ] for dest, source in files: check_file_link(dest, source) assert os.path.isfile("dest/x") assert os.path.isfile("dest/a/b/y") link_tree.unmerge("dest") assert os.path.isfile("dest/x") assert os.path.isfile("dest/a/b/y") for dest, _ in files: assert not os.path.isfile(dest) def test_merge_with_empty_directories(stage, link_tree): with working_dir(stage.path): mkdirp("dest/f/g") mkdirp("dest/a/b/h") link_tree.merge("dest") link_tree.unmerge("dest") assert not os.path.exists("dest/1") assert not os.path.exists("dest/a/b/2") assert not os.path.exists("dest/a/b/3") assert not os.path.exists("dest/c/4") assert not os.path.exists("dest/c/d/5") assert not os.path.exists("dest/c/d/6") assert not os.path.exists("dest/c/d/e/7") assert os.path.isdir("dest/a/b/h") assert os.path.isdir("dest/f/g") def test_ignore(stage, link_tree): with working_dir(stage.path): touchp("source/.spec") touchp("dest/.spec") link_tree.merge("dest", ignore=lambda x: x == ".spec") link_tree.unmerge("dest", ignore=lambda x: x == ".spec") assert not os.path.exists("dest/1") assert not os.path.exists("dest/a") assert not os.path.exists("dest/c") assert os.path.isfile("source/.spec") assert os.path.isfile("dest/.spec") def test_source_merge_visitor_does_not_follow_symlinked_dirs_at_depth(tmpdir): """Given an dir structure like this:: . `-- a |-- b | |-- c | | |-- d | | | `-- file | | `-- symlink_d -> d | `-- symlink_c -> c `-- symlink_b -> b The SoureMergeVisitor will expand symlinked dirs to directories, but only to fixed depth, to avoid exponential explosion. In our current defaults, symlink_b will be expanded, but symlink_c and symlink_d will not. """ j = os.path.join with tmpdir.as_cwd(): os.mkdir(j("a")) os.mkdir(j("a", "b")) os.mkdir(j("a", "b", "c")) os.mkdir(j("a", "b", "c", "d")) symlink(j("b"), j("a", "symlink_b")) symlink(j("c"), j("a", "b", "symlink_c")) symlink(j("d"), j("a", "b", "c", "symlink_d")) with open(j("a", "b", "c", "d", "file"), "wb"): pass visitor = SourceMergeVisitor() visit_directory_tree(str(tmpdir), visitor) assert [p for p in visitor.files.keys()] == [ j("a", "b", "c", "d", "file"), j("a", "b", "c", "symlink_d"), # treated as a file, not expanded j("a", "b", "symlink_c"), # treated as a file, not expanded j("a", "symlink_b", "c", "d", "file"), # symlink_b was expanded j("a", "symlink_b", "c", "symlink_d"), # symlink_b was expanded j("a", "symlink_b", "symlink_c"), # symlink_b was expanded ] assert [p for p in visitor.directories.keys()] == [ j("a"), j("a", "b"), j("a", "b", "c"), j("a", "b", "c", "d"), j("a", "symlink_b"), j("a", "symlink_b", "c"), j("a", "symlink_b", "c", "d"), ] def test_source_merge_visitor_cant_be_cyclical(tmpdir): """Given an dir structure like this:: . |-- a | `-- symlink_b -> ../b | `-- symlink_symlink_b -> symlink_b `-- b `-- symlink_a -> ../a The SoureMergeVisitor will not expand `a/symlink_b`, `a/symlink_symlink_b` and `b/symlink_a` to avoid recursion. The general rule is: only expand symlinked dirs pointing deeper into the directory structure. """ j = os.path.join with tmpdir.as_cwd(): os.mkdir(j("a")) os.mkdir(j("b")) symlink(j("..", "b"), j("a", "symlink_b")) symlink(j("symlink_b"), j("a", "symlink_b_b")) symlink(j("..", "a"), j("b", "symlink_a")) visitor = SourceMergeVisitor() visit_directory_tree(str(tmpdir), visitor) assert [p for p in visitor.files.keys()] == [ j("a", "symlink_b"), j("a", "symlink_b_b"), j("b", "symlink_a"), ] assert [p for p in visitor.directories.keys()] == [j("a"), j("b")] def test_destination_merge_visitor_always_errors_on_symlinked_dirs(tmpdir): """When merging prefixes into a non-empty destination folder, and this destination folder has a symlinked dir where the prefix has a dir, we should never merge any files there, but register a fatal error.""" j = os.path.join # Here example_a and example_b are symlinks. with tmpdir.mkdir("dst").as_cwd(): os.mkdir("a") os.symlink("a", "example_a") os.symlink("a", "example_b") # Here example_a is a directory, and example_b is a (non-expanded) symlinked # directory. with tmpdir.mkdir("src").as_cwd(): os.mkdir("example_a") with open(j("example_a", "file"), "wb"): pass os.symlink("..", "example_b") visitor = SourceMergeVisitor() visit_directory_tree(str(tmpdir.join("src")), visitor) visit_directory_tree(str(tmpdir.join("dst")), DestinationMergeVisitor(visitor)) assert visitor.fatal_conflicts conflicts = [c.dst for c in visitor.fatal_conflicts] assert "example_a" in conflicts assert "example_b" in conflicts def test_destination_merge_visitor_file_dir_clashes(tmpdir): """Tests whether non-symlink file-dir and dir-file clashes as registered as fatal errors""" with tmpdir.mkdir("a").as_cwd(): os.mkdir("example") with tmpdir.mkdir("b").as_cwd(): with open("example", "wb"): pass a_to_b = SourceMergeVisitor() visit_directory_tree(str(tmpdir.join("a")), a_to_b) visit_directory_tree(str(tmpdir.join("b")), DestinationMergeVisitor(a_to_b)) assert a_to_b.fatal_conflicts assert a_to_b.fatal_conflicts[0].dst == "example" b_to_a = SourceMergeVisitor() visit_directory_tree(str(tmpdir.join("b")), b_to_a) visit_directory_tree(str(tmpdir.join("a")), DestinationMergeVisitor(b_to_a)) assert b_to_a.fatal_conflicts assert b_to_a.fatal_conflicts[0].dst == "example"