summaryrefslogtreecommitdiff
path: root/lib/spack/spack/oci/oci.py
blob: 4e5e196cd10db914d5efdd72eb94d845e4d3eb52 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import hashlib
import json
import os
import time
import urllib.error
import urllib.parse
import urllib.request
from http.client import HTTPResponse
from typing import NamedTuple, Tuple
from urllib.request import Request

import llnl.util.tty as tty

import spack.binary_distribution
import spack.config
import spack.error
import spack.fetch_strategy
import spack.mirror
import spack.oci.opener
import spack.repo
import spack.spec
import spack.stage
import spack.traverse
import spack.util.crypto

from .image import Digest, ImageReference


class Blob(NamedTuple):
    compressed_digest: Digest
    uncompressed_digest: Digest
    size: int


def create_tarball(spec: spack.spec.Spec, tarfile_path):
    buildinfo = spack.binary_distribution.get_buildinfo_dict(spec)
    return spack.binary_distribution._do_create_tarball(tarfile_path, spec.prefix, buildinfo)


def _log_upload_progress(digest: Digest, size: int, elapsed: float):
    elapsed = max(elapsed, 0.001)  # guard against division by zero
    tty.info(f"Uploaded {digest} ({elapsed:.2f}s, {size / elapsed / 1024 / 1024:.2f} MB/s)")


def with_query_param(url: str, param: str, value: str) -> str:
    """Add a query parameter to a URL

    Args:
        url: The URL to add the parameter to.
        param: The parameter name.
        value: The parameter value.

    Returns:
        The URL with the parameter added.
    """
    parsed = urllib.parse.urlparse(url)
    query = urllib.parse.parse_qs(parsed.query)
    if param in query:
        query[param].append(value)
    else:
        query[param] = [value]
    return urllib.parse.urlunparse(
        parsed._replace(query=urllib.parse.urlencode(query, doseq=True))
    )


def upload_blob(
    ref: ImageReference,
    file: str,
    digest: Digest,
    force: bool = False,
    small_file_size: int = 0,
    _urlopen: spack.oci.opener.MaybeOpen = None,
) -> bool:
    """Uploads a blob to an OCI registry

    We only do monolithic uploads, even though it's very simple to do chunked.
    Observed problems with chunked uploads:
    (1) it's slow, many sequential requests, (2) some registries set an *unknown*
    max chunk size, and the spec doesn't say how to obtain it

    Args:
        ref: The image reference.
        file: The file to upload.
        digest: The digest of the file.
        force: Whether to force upload the blob, even if it already exists.
        small_file_size: For files at most this size, attempt
            to do a single POST request instead of POST + PUT.
            Some registries do no support single requests, and others
            do not specify what size they support in single POST.
            For now this feature is disabled by default (0KB)

    Returns:
        True if the blob was uploaded, False if it already existed.
    """
    _urlopen = _urlopen or spack.oci.opener.urlopen

    # Test if the blob already exists, if so, early exit.
    if not force and blob_exists(ref, digest, _urlopen):
        return False

    start = time.time()

    with open(file, "rb") as f:
        file_size = os.fstat(f.fileno()).st_size

        # For small blobs, do a single POST request.
        # The spec says that registries MAY support this
        if file_size <= small_file_size:
            request = Request(
                url=ref.uploads_url(digest),
                method="POST",
                data=f,
                headers={
                    "Content-Type": "application/octet-stream",
                    "Content-Length": str(file_size),
                },
            )
        else:
            request = Request(
                url=ref.uploads_url(), method="POST", headers={"Content-Length": "0"}
            )

        response = _urlopen(request)

        # Created the blob in one go.
        if response.status == 201:
            _log_upload_progress(digest, file_size, time.time() - start)
            return True

        # Otherwise, do another PUT request.
        spack.oci.opener.ensure_status(response, 202)
        assert "Location" in response.headers

        # Can be absolute or relative, joining handles both
        upload_url = with_query_param(
            ref.endpoint(response.headers["Location"]), "digest", str(digest)
        )
        f.seek(0)

        response = _urlopen(
            Request(
                url=upload_url,
                method="PUT",
                data=f,
                headers={
                    "Content-Type": "application/octet-stream",
                    "Content-Length": str(file_size),
                },
            )
        )

        spack.oci.opener.ensure_status(response, 201)

    # print elapsed time and # MB/s
    _log_upload_progress(digest, file_size, time.time() - start)
    return True


def upload_manifest(
    ref: ImageReference,
    oci_manifest: dict,
    tag: bool = True,
    _urlopen: spack.oci.opener.MaybeOpen = None,
):
    """Uploads a manifest/index to a registry

    Args:
        ref: The image reference.
        oci_manifest: The OCI manifest or index.
        tag: When true, use the tag, otherwise use the digest,
            this is relevant for multi-arch images, where the
            tag is an index, referencing the manifests by digest.

    Returns:
        The digest and size of the uploaded manifest.
    """
    _urlopen = _urlopen or spack.oci.opener.urlopen

    data = json.dumps(oci_manifest, separators=(",", ":")).encode()
    digest = Digest.from_sha256(hashlib.sha256(data).hexdigest())
    size = len(data)

    if not tag:
        ref = ref.with_digest(digest)

    response = _urlopen(
        Request(
            url=ref.manifest_url(),
            method="PUT",
            data=data,
            headers={"Content-Type": oci_manifest["mediaType"]},
        )
    )

    spack.oci.opener.ensure_status(response, 201)
    return digest, size


def image_from_mirror(mirror: spack.mirror.Mirror) -> ImageReference:
    """Given an OCI based mirror, extract the URL and image name from it"""
    url = mirror.push_url
    if not url.startswith("oci://"):
        raise ValueError(f"Mirror {mirror} is not an OCI mirror")
    return ImageReference.from_string(url[6:])


def blob_exists(
    ref: ImageReference, digest: Digest, _urlopen: spack.oci.opener.MaybeOpen = None
) -> bool:
    """Checks if a blob exists in an OCI registry"""
    try:
        _urlopen = _urlopen or spack.oci.opener.urlopen
        response = _urlopen(Request(url=ref.blob_url(digest), method="HEAD"))
        return response.status == 200
    except urllib.error.HTTPError as e:
        if e.getcode() == 404:
            return False
        raise


def copy_missing_layers(
    src: ImageReference,
    dst: ImageReference,
    architecture: str,
    _urlopen: spack.oci.opener.MaybeOpen = None,
) -> Tuple[dict, dict]:
    """Copy image layers from src to dst for given architecture.

    Args:
        src: The source image reference.
        dst: The destination image reference.
        architecture: The architecture (when referencing an index)

    Returns:
        Tuple of manifest and config of the base image.
    """
    _urlopen = _urlopen or spack.oci.opener.urlopen
    manifest, config = get_manifest_and_config(src, architecture, _urlopen=_urlopen)

    # Get layer digests
    digests = [Digest.from_string(layer["digest"]) for layer in manifest["layers"]]

    # Filter digests that are don't exist in the registry
    missing_digests = [
        digest for digest in digests if not blob_exists(dst, digest, _urlopen=_urlopen)
    ]

    if not missing_digests:
        return manifest, config

    # Pull missing blobs, push them to the registry
    with spack.stage.StageComposite.from_iterable(
        make_stage(url=src.blob_url(digest), digest=digest, _urlopen=_urlopen)
        for digest in missing_digests
    ) as stages:
        stages.fetch()
        stages.check()
        stages.cache_local()

        for stage, digest in zip(stages, missing_digests):
            # No need to check existince again, force=True.
            upload_blob(
                dst, file=stage.save_filename, force=True, digest=digest, _urlopen=_urlopen
            )

    return manifest, config


#: OCI manifest content types (including docker type)
manifest_content_type = [
    "application/vnd.oci.image.manifest.v1+json",
    "application/vnd.docker.distribution.manifest.v2+json",
]

#: OCI index content types (including docker type)
index_content_type = [
    "application/vnd.oci.image.index.v1+json",
    "application/vnd.docker.distribution.manifest.list.v2+json",
]

#: All OCI manifest / index content types
all_content_type = manifest_content_type + index_content_type


def get_manifest_and_config(
    ref: ImageReference,
    architecture="amd64",
    recurse=3,
    _urlopen: spack.oci.opener.MaybeOpen = None,
) -> Tuple[dict, dict]:
    """Recursively fetch manifest and config for a given image reference
    with a given architecture.

    Args:
        ref: The image reference.
        architecture: The architecture (when referencing an index)
        recurse: How many levels of index to recurse into.

    Returns:
        A tuple of (manifest, config)"""

    _urlopen = _urlopen or spack.oci.opener.urlopen

    # Get manifest
    response: HTTPResponse = _urlopen(
        Request(url=ref.manifest_url(), headers={"Accept": ", ".join(all_content_type)})
    )

    # Recurse when we find an index
    if response.headers["Content-Type"] in index_content_type:
        if recurse == 0:
            raise Exception("Maximum recursion depth reached while fetching OCI manifest")

        index = json.load(response)
        manifest_meta = next(
            manifest
            for manifest in index["manifests"]
            if manifest["platform"]["architecture"] == architecture
        )

        return get_manifest_and_config(
            ref.with_digest(manifest_meta["digest"]),
            architecture=architecture,
            recurse=recurse - 1,
            _urlopen=_urlopen,
        )

    # Otherwise, require a manifest
    if response.headers["Content-Type"] not in manifest_content_type:
        raise Exception(f"Unknown content type {response.headers['Content-Type']}")

    manifest = json.load(response)

    # Download, verify and cache config file
    config_digest = Digest.from_string(manifest["config"]["digest"])
    with make_stage(ref.blob_url(config_digest), config_digest, _urlopen=_urlopen) as stage:
        stage.fetch()
        stage.check()
        stage.cache_local()
        with open(stage.save_filename, "rb") as f:
            config = json.load(f)

    return manifest, config


#: Same as upload_manifest, but with retry wrapper
upload_manifest_with_retry = spack.oci.opener.default_retry(upload_manifest)

#: Same as upload_blob, but with retry wrapper
upload_blob_with_retry = spack.oci.opener.default_retry(upload_blob)

#: Same as get_manifest_and_config, but with retry wrapper
get_manifest_and_config_with_retry = spack.oci.opener.default_retry(get_manifest_and_config)

#: Same as copy_missing_layers, but with retry wrapper
copy_missing_layers_with_retry = spack.oci.opener.default_retry(copy_missing_layers)


def make_stage(
    url: str, digest: Digest, keep: bool = False, _urlopen: spack.oci.opener.MaybeOpen = None
) -> spack.stage.Stage:
    _urlopen = _urlopen or spack.oci.opener.urlopen
    fetch_strategy = spack.fetch_strategy.OCIRegistryFetchStrategy(
        url, checksum=digest.digest, _urlopen=_urlopen
    )
    # Use blobs/<alg>/<encoded> as the cache path, which follows
    # the OCI Image Layout Specification. What's missing though,
    # is the `oci-layout` and `index.json` files, which are
    # required by the spec.
    return spack.stage.Stage(
        fetch_strategy,
        mirror_paths=spack.mirror.OCIImageLayout(digest),
        name=digest.digest,
        keep=keep,
    )