From 5b272e3ff3f7cac83d4e3db402781f535950d26f Mon Sep 17 00:00:00 2001 From: Massimiliano Culpo Date: Fri, 5 Jun 2020 09:08:32 +0200 Subject: commands: use a single ThreadPool for `spack versions` (#16749) This fixes a fork bomb in `spack versions`. Recursive generation of pools to scrape URLs in `_spider` was creating large numbers of processes. Instead of recursively creating process pools, we now use a single `ThreadPool` with a concurrency limit. More on the issue: having ~10 users running at the same time spack versions on front-end nodes caused kernel lockup due to the high number of sockets opened (sys-admin reports ~210k distributed over 3 nodes). Users were internal, so they had ulimit -n set to ~70k. The forking behavior could be observed by just running: $ spack versions boost and checking the number of processes spawned. Number of processes per se was not the issue, but each one of them opens a socket which can stress `iptables`. In the original issue the kernel watchdog was reporting: Message from syslogd@login03 at May 19 12:01:30 ... kernel:Watchdog CPU:110 Hard LOCKUP Message from syslogd@login03 at May 19 12:01:31 ... kernel:watchdog: BUG: soft lockup - CPU#110 stuck for 23s! [python3:2756] Message from syslogd@login03 at May 19 12:01:31 ... kernel:watchdog: BUG: soft lockup - CPU#94 stuck for 22s! [iptables:5603] --- lib/spack/spack/cmd/versions.py | 6 +- lib/spack/spack/package.py | 5 +- lib/spack/spack/test/web.py | 200 +++++++++++++----------------- lib/spack/spack/util/web.py | 268 ++++++++++++++++++++-------------------- 4 files changed, 228 insertions(+), 251 deletions(-) (limited to 'lib') diff --git a/lib/spack/spack/cmd/versions.py b/lib/spack/spack/cmd/versions.py index 723f89ce08..366307f0b2 100644 --- a/lib/spack/spack/cmd/versions.py +++ b/lib/spack/spack/cmd/versions.py @@ -21,6 +21,10 @@ level = "long" def setup_parser(subparser): subparser.add_argument('-s', '--safe-only', action='store_true', help='only list safe versions of the package') + subparser.add_argument( + '-c', '--concurrency', default=32, type=int, + help='number of concurrent requests' + ) arguments.add_common_arguments(subparser, ['package']) @@ -45,7 +49,7 @@ def versions(parser, args): if sys.stdout.isatty(): tty.msg('Remote versions (not yet checksummed):') - fetched_versions = pkg.fetch_remote_versions() + fetched_versions = pkg.fetch_remote_versions(args.concurrency) remote_versions = set(fetched_versions).difference(safe_versions) if not remote_versions: diff --git a/lib/spack/spack/package.py b/lib/spack/spack/package.py index bb5ea41dc3..c9774c5b29 100644 --- a/lib/spack/spack/package.py +++ b/lib/spack/spack/package.py @@ -2020,7 +2020,7 @@ class PackageBase(with_metaclass(PackageMeta, PackageViewMixin, object)): urls.append(args['url']) return urls - def fetch_remote_versions(self): + def fetch_remote_versions(self, concurrency=128): """Find remote versions of this package. Uses ``list_url`` and any other URLs listed in the package file. @@ -2033,7 +2033,8 @@ class PackageBase(with_metaclass(PackageMeta, PackageViewMixin, object)): try: return spack.util.web.find_versions_of_archive( - self.all_urls, self.list_url, self.list_depth) + self.all_urls, self.list_url, self.list_depth, concurrency + ) except spack.util.web.NoNetworkConnectionError as e: tty.die("Package.fetch_versions couldn't connect to:", e.url, e.message) diff --git a/lib/spack/spack/test/web.py b/lib/spack/spack/test/web.py index ae62301319..dfca41c95b 100644 --- a/lib/spack/spack/test/web.py +++ b/lib/spack/spack/test/web.py @@ -2,125 +2,101 @@ # Spack Project Developers. See the top-level COPYRIGHT file for details. # # SPDX-License-Identifier: (Apache-2.0 OR MIT) - -"""Tests for web.py.""" import os -import pytest - -from ordereddict_backport import OrderedDict +import ordereddict_backport +import pytest import spack.paths -import spack.util.web as web_util +import spack.util.web from spack.version import ver -web_data_path = os.path.join(spack.paths.test_path, 'data', 'web') - -root = 'file://' + web_data_path + '/index.html' -root_tarball = 'file://' + web_data_path + '/foo-0.0.0.tar.gz' - -page_1 = 'file://' + os.path.join(web_data_path, '1.html') -page_2 = 'file://' + os.path.join(web_data_path, '2.html') -page_3 = 'file://' + os.path.join(web_data_path, '3.html') -page_4 = 'file://' + os.path.join(web_data_path, '4.html') - - -def test_spider_0(): - pages, links = web_util.spider(root, depth=0) - - assert root in pages - assert page_1 not in pages - assert page_2 not in pages - assert page_3 not in pages - assert page_4 not in pages - - assert "This is the root page." in pages[root] - - assert root not in links - assert page_1 in links - assert page_2 not in links - assert page_3 not in links - assert page_4 not in links - - -def test_spider_1(): - pages, links = web_util.spider(root, depth=1) - - assert root in pages - assert page_1 in pages - assert page_2 not in pages - assert page_3 not in pages - assert page_4 not in pages - - assert "This is the root page." in pages[root] - assert "This is page 1." in pages[page_1] - - assert root not in links - assert page_1 in links - assert page_2 in links - assert page_3 not in links - assert page_4 not in links - - -def test_spider_2(): - pages, links = web_util.spider(root, depth=2) - - assert root in pages - assert page_1 in pages - assert page_2 in pages - assert page_3 not in pages - assert page_4 not in pages - - assert "This is the root page." in pages[root] - assert "This is page 1." in pages[page_1] - assert "This is page 2." in pages[page_2] - - assert root not in links - assert page_1 in links - assert page_1 in links - assert page_2 in links - assert page_3 in links - assert page_4 in links - - -def test_spider_3(): - pages, links = web_util.spider(root, depth=3) - - assert root in pages - assert page_1 in pages - assert page_2 in pages - assert page_3 in pages - assert page_4 in pages - - assert "This is the root page." in pages[root] - assert "This is page 1." in pages[page_1] - assert "This is page 2." in pages[page_2] - assert "This is page 3." in pages[page_3] - assert "This is page 4." in pages[page_4] - - assert root in links # circular link on page 3 - assert page_1 in links - assert page_1 in links - assert page_2 in links - assert page_3 in links - assert page_4 in links +def _create_url(relative_url): + web_data_path = os.path.join(spack.paths.test_path, 'data', 'web') + return 'file://' + os.path.join(web_data_path, relative_url) + + +root = _create_url('index.html') +root_tarball = _create_url('foo-0.0.0.tar.gz') +page_1 = _create_url('1.html') +page_2 = _create_url('2.html') +page_3 = _create_url('3.html') +page_4 = _create_url('4.html') + + +@pytest.mark.parametrize( + 'depth,expected_found,expected_not_found,expected_text', [ + (0, + {'pages': [root], 'links': [page_1]}, + {'pages': [page_1, page_2, page_3, page_4], + 'links': [root, page_2, page_3, page_4]}, + {root: "This is the root page."}), + (1, + {'pages': [root, page_1], 'links': [page_1, page_2]}, + {'pages': [page_2, page_3, page_4], + 'links': [root, page_3, page_4]}, + {root: "This is the root page.", + page_1: "This is page 1."}), + (2, + {'pages': [root, page_1, page_2], + 'links': [page_1, page_2, page_3, page_4]}, + {'pages': [page_3, page_4], 'links': [root]}, + {root: "This is the root page.", + page_1: "This is page 1.", + page_2: "This is page 2."}), + (3, + {'pages': [root, page_1, page_2, page_3, page_4], + 'links': [root, page_1, page_2, page_3, page_4]}, + {'pages': [], 'links': []}, + {root: "This is the root page.", + page_1: "This is page 1.", + page_2: "This is page 2.", + page_3: "This is page 3.", + page_4: "This is page 4."}), + ]) +def test_spider(depth, expected_found, expected_not_found, expected_text): + pages, links = spack.util.web.spider(root, depth=depth) + + for page in expected_found['pages']: + assert page in pages + + for page in expected_not_found['pages']: + assert page not in pages + + for link in expected_found['links']: + assert link in links + + for link in expected_not_found['links']: + assert link not in links + + for page, text in expected_text.items(): + assert text in pages[page] + + +def test_spider_no_response(monkeypatch): + # Mock the absence of a response + monkeypatch.setattr( + spack.util.web, 'read_from_url', lambda x, y: (None, None, None) + ) + pages, links = spack.util.web.spider(root, depth=0) + assert not pages and not links def test_find_versions_of_archive_0(): - versions = web_util.find_versions_of_archive( + versions = spack.util.web.find_versions_of_archive( root_tarball, root, list_depth=0) assert ver('0.0.0') in versions def test_find_versions_of_archive_1(): - versions = web_util.find_versions_of_archive( + versions = spack.util.web.find_versions_of_archive( root_tarball, root, list_depth=1) assert ver('0.0.0') in versions assert ver('1.0.0') in versions def test_find_versions_of_archive_2(): - versions = web_util.find_versions_of_archive( + versions = spack.util.web.find_versions_of_archive( root_tarball, root, list_depth=2) assert ver('0.0.0') in versions assert ver('1.0.0') in versions @@ -128,14 +104,14 @@ def test_find_versions_of_archive_2(): def test_find_exotic_versions_of_archive_2(): - versions = web_util.find_versions_of_archive( + versions = spack.util.web.find_versions_of_archive( root_tarball, root, list_depth=2) # up for grabs to make this better. assert ver('2.0.0b2') in versions def test_find_versions_of_archive_3(): - versions = web_util.find_versions_of_archive( + versions = spack.util.web.find_versions_of_archive( root_tarball, root, list_depth=3) assert ver('0.0.0') in versions assert ver('1.0.0') in versions @@ -145,7 +121,7 @@ def test_find_versions_of_archive_3(): def test_find_exotic_versions_of_archive_3(): - versions = web_util.find_versions_of_archive( + versions = spack.util.web.find_versions_of_archive( root_tarball, root, list_depth=3) assert ver('2.0.0b2') in versions assert ver('3.0a1') in versions @@ -159,35 +135,35 @@ def test_get_header(): # looking up headers should just work like a plain dict # lookup when there is an entry with the right key - assert(web_util.get_header(headers, 'Content-type') == 'text/plain') + assert(spack.util.web.get_header(headers, 'Content-type') == 'text/plain') # looking up headers should still work if there is a fuzzy match - assert(web_util.get_header(headers, 'contentType') == 'text/plain') + assert(spack.util.web.get_header(headers, 'contentType') == 'text/plain') # ...unless there is an exact match for the "fuzzy" spelling. headers['contentType'] = 'text/html' - assert(web_util.get_header(headers, 'contentType') == 'text/html') + assert(spack.util.web.get_header(headers, 'contentType') == 'text/html') # If lookup has to fallback to fuzzy matching and there are more than one # fuzzy match, the result depends on the internal ordering of the given # mapping - headers = OrderedDict() + headers = ordereddict_backport.OrderedDict() headers['Content-type'] = 'text/plain' headers['contentType'] = 'text/html' - assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/plain') + assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/plain') del headers['Content-type'] - assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/html') + assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/html') # Same as above, but different ordering - headers = OrderedDict() + headers = ordereddict_backport.OrderedDict() headers['contentType'] = 'text/html' headers['Content-type'] = 'text/plain' - assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/html') + assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/html') del headers['contentType'] - assert(web_util.get_header(headers, 'CONTENT_TYPE') == 'text/plain') + assert(spack.util.web.get_header(headers, 'CONTENT_TYPE') == 'text/plain') # If there isn't even a fuzzy match, raise KeyError with pytest.raises(KeyError): - web_util.get_header(headers, 'ContentLength') + spack.util.web.get_header(headers, 'ContentLength') diff --git a/lib/spack/spack/util/web.py b/lib/spack/spack/util/web.py index 8039dc5fda..3f71dd1f71 100644 --- a/lib/spack/spack/util/web.py +++ b/lib/spack/spack/util/web.py @@ -7,17 +7,18 @@ from __future__ import print_function import codecs import errno -import re +import multiprocessing.pool import os import os.path +import re import shutil import ssl import sys import traceback -from six.moves.urllib.request import urlopen, Request +import six from six.moves.urllib.error import URLError -import multiprocessing.pool +from six.moves.urllib.request import urlopen, Request try: # Python 2 had these in the HTMLParser package. @@ -63,34 +64,6 @@ class LinkParser(HTMLParser): self.links.append(val) -class NonDaemonProcess(multiprocessing.Process): - """Process that allows sub-processes, so pools can have sub-pools.""" - @property - def daemon(self): - return False - - @daemon.setter - def daemon(self, value): - pass - - -if sys.version_info[0] < 3: - class NonDaemonPool(multiprocessing.pool.Pool): - """Pool that uses non-daemon processes""" - Process = NonDaemonProcess -else: - - class NonDaemonContext(type(multiprocessing.get_context())): # novm - Process = NonDaemonProcess - - class NonDaemonPool(multiprocessing.pool.Pool): - """Pool that uses non-daemon processes""" - - def __init__(self, *args, **kwargs): - kwargs['context'] = NonDaemonContext() - super(NonDaemonPool, self).__init__(*args, **kwargs) - - def uses_ssl(parsed_url): if parsed_url.scheme == 'https': return True @@ -336,107 +309,150 @@ def list_url(url): for key in _iter_s3_prefix(s3, url))) -def _spider(url, visited, root, depth, max_depth, raise_on_error): - """Fetches URL and any pages it links to up to max_depth. +def spider(root_urls, depth=0, concurrency=32): + """Get web pages from root URLs. - depth should initially be zero, and max_depth is the max depth of - links to follow from the root. + If depth is specified (e.g., depth=2), then this will also follow + up to levels of links from each root. - Prints out a warning only if the root can't be fetched; it ignores - errors with pages that the root links to. + Args: + root_urls (str or list of str): root urls used as a starting point + for spidering + depth (int): level of recursion into links + concurrency (int): number of simultaneous requests that can be sent - Returns a tuple of: - - pages: dict of pages visited (URL) mapped to their full text. - - links: set of links encountered while visiting the pages. + Returns: + A dict of pages visited (URL) mapped to their full text and the + set of visited links. """ - pages = {} # dict from page URL -> text content. - links = set() # set of all links seen on visited pages. + # Cache of visited links, meant to be captured by the closure below + _visited = set() + + def _spider(url, collect_nested): + """Fetches URL and any pages it links to. + + Prints out a warning only if the root can't be fetched; it ignores + errors with pages that the root links to. + + Args: + url (str): url being fetched and searched for links + collect_nested (bool): whether we want to collect arguments + for nested spidering on the links found in this url + + Returns: + A tuple of: + - pages: dict of pages visited (URL) mapped to their full text. + - links: set of links encountered while visiting the pages. + - spider_args: argument for subsequent call to spider + """ + pages = {} # dict from page URL -> text content. + links = set() # set of all links seen on visited pages. + subcalls = [] - try: - response_url, _, response = read_from_url(url, 'text/html') - if not response_url or not response: - return pages, links + try: + response_url, _, response = read_from_url(url, 'text/html') + if not response_url or not response: + return pages, links, subcalls - page = codecs.getreader('utf-8')(response).read() - pages[response_url] = page + page = codecs.getreader('utf-8')(response).read() + pages[response_url] = page - # Parse out the links in the page - link_parser = LinkParser() - subcalls = [] - link_parser.feed(page) - - while link_parser.links: - raw_link = link_parser.links.pop() - abs_link = url_util.join( - response_url, - raw_link.strip(), - resolve_href=True) - links.add(abs_link) - - # Skip stuff that looks like an archive - if any(raw_link.endswith(suf) for suf in ALLOWED_ARCHIVE_TYPES): - continue + # Parse out the links in the page + link_parser = LinkParser() + link_parser.feed(page) - # Skip things outside the root directory - if not abs_link.startswith(root): - continue + while link_parser.links: + raw_link = link_parser.links.pop() + abs_link = url_util.join( + response_url, + raw_link.strip(), + resolve_href=True) + links.add(abs_link) - # Skip already-visited links - if abs_link in visited: - continue + # Skip stuff that looks like an archive + if any(raw_link.endswith(s) for s in ALLOWED_ARCHIVE_TYPES): + continue - # If we're not at max depth, follow links. - if depth < max_depth: - subcalls.append((abs_link, visited, root, - depth + 1, max_depth, raise_on_error)) - visited.add(abs_link) + # Skip already-visited links + if abs_link in _visited: + continue - if subcalls: - pool = NonDaemonPool(processes=len(subcalls)) - try: - results = pool.map(_spider_wrapper, subcalls) + # If we're not at max depth, follow links. + if collect_nested: + subcalls.append((abs_link,)) + _visited.add(abs_link) - for sub_pages, sub_links in results: - pages.update(sub_pages) - links.update(sub_links) + except URLError as e: + tty.debug(str(e)) - finally: - pool.terminate() - pool.join() + if hasattr(e, 'reason') and isinstance(e.reason, ssl.SSLError): + tty.warn("Spack was unable to fetch url list due to a " + "certificate verification problem. You can try " + "running spack -k, which will not check SSL " + "certificates. Use this at your own risk.") - except URLError as e: - tty.debug(e) + except HTMLParseError as e: + # This error indicates that Python's HTML parser sucks. + msg = "Got an error parsing HTML." - if hasattr(e, 'reason') and isinstance(e.reason, ssl.SSLError): - tty.warn("Spack was unable to fetch url list due to a certificate " - "verification problem. You can try running spack -k, " - "which will not check SSL certificates. Use this at your " - "own risk.") + # Pre-2.7.3 Pythons in particular have rather prickly HTML parsing. + if sys.version_info[:3] < (2, 7, 3): + msg += " Use Python 2.7.3 or newer for better HTML parsing." - if raise_on_error: - raise NoNetworkConnectionError(str(e), url) + tty.warn(msg, url, "HTMLParseError: " + str(e)) - except HTMLParseError as e: - # This error indicates that Python's HTML parser sucks. - msg = "Got an error parsing HTML." + except Exception as e: + # Other types of errors are completely ignored, + # except in debug mode + tty.debug("Error in _spider: %s:%s" % (type(e), str(e)), + traceback.format_exc()) - # Pre-2.7.3 Pythons in particular have rather prickly HTML parsing. - if sys.version_info[:3] < (2, 7, 3): - msg += " Use Python 2.7.3 or newer for better HTML parsing." + finally: + tty.debug("SPIDER: [url={0}]".format(url)) - tty.warn(msg, url, "HTMLParseError: " + str(e)) + return pages, links, subcalls - except Exception as e: - # Other types of errors are completely ignored, except in debug mode. - tty.debug("Error in _spider: %s:%s" % (type(e), e), - traceback.format_exc()) + # TODO: Needed until we drop support for Python 2.X + def star(func): + def _wrapper(args): + return func(*args) + return _wrapper - return pages, links + if isinstance(root_urls, six.string_types): + root_urls = [root_urls] + # Clear the local cache of visited pages before starting the search + _visited.clear() -def _spider_wrapper(args): - """Wrapper for using spider with multiprocessing.""" - return _spider(*args) + current_depth = 0 + pages, links, spider_args = {}, set(), [] + + collect = current_depth < depth + for root in root_urls: + root = url_util.parse(root) + spider_args.append((root, collect)) + + tp = multiprocessing.pool.ThreadPool(processes=concurrency) + try: + while current_depth <= depth: + tty.debug("SPIDER: [depth={0}, max_depth={1}, urls={2}]".format( + current_depth, depth, len(spider_args)) + ) + results = tp.map(star(_spider), spider_args) + spider_args = [] + collect = current_depth < depth + for sub_pages, sub_links, sub_spider_args in results: + sub_spider_args = [x + (collect,) for x in sub_spider_args] + pages.update(sub_pages) + links.update(sub_links) + spider_args.extend(sub_spider_args) + + current_depth += 1 + finally: + tp.terminate() + tp.join() + + return pages, links def _urlopen(req, *args, **kwargs): @@ -460,37 +476,22 @@ def _urlopen(req, *args, **kwargs): return opener(req, *args, **kwargs) -def spider(root, depth=0): - """Gets web pages from a root URL. - - If depth is specified (e.g., depth=2), then this will also follow - up to levels of links from the root. - - This will spawn processes to fetch the children, for much improved - performance over a sequential fetch. - - """ - root = url_util.parse(root) - pages, links = _spider(root, set(), root, 0, depth, False) - return pages, links - - -def find_versions_of_archive(archive_urls, list_url=None, list_depth=0): +def find_versions_of_archive( + archive_urls, list_url=None, list_depth=0, concurrency=32 +): """Scrape web pages for new versions of a tarball. - Arguments: + Args: archive_urls (str or list or tuple): URL or sequence of URLs for different versions of a package. Typically these are just the tarballs from the package file itself. By default, this searches the parent directories of archives. - - Keyword Arguments: list_url (str or None): URL for a listing of archives. Spack will scrape these pages for download links that look like the archive URL. - - list_depth (int): Max depth to follow links on list_url pages. + list_depth (int): max depth to follow links on list_url pages. Defaults to 0. + concurrency (int): maximum number of concurrent requests """ if not isinstance(archive_urls, (list, tuple)): archive_urls = [archive_urls] @@ -511,12 +512,7 @@ def find_versions_of_archive(archive_urls, list_url=None, list_depth=0): list_urls |= additional_list_urls # Grab some web pages to scrape. - pages = {} - links = set() - for lurl in list_urls: - pg, lnk = spider(lurl, depth=list_depth) - pages.update(pg) - links.update(lnk) + pages, links = spider(list_urls, depth=list_depth, concurrency=concurrency) # Scrape them for archive URLs regexes = [] -- cgit v1.2.3-60-g2f50