Merge "A new diskimage-builder command for yaml image builds"

This commit is contained in:
Zuul 2023-03-21 04:30:19 +00:00 committed by Gerrit Code Review
commit c214704614
6 changed files with 1172 additions and 0 deletions

View File

@ -0,0 +1,574 @@
# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import collections
import io
import jsonschema
import os
import os.path
import shlex
import subprocess
import sys
import textwrap
import yaml
import diskimage_builder.paths
class SchemaProperty(object):
"""Base class for a basic schema and a help string"""
key = None
description = None
schema_type = "string"
def __init__(self, key, description, schema_type=None):
self.key = key
self.description = description
if schema_type:
self.schema_type = schema_type
def to_schema(self):
return {self.key: {"type": self.schema_type}}
def type_help(self):
return "Value is a string"
def to_help(self):
return "%s\n%s\n%s\n(%s)" % (
self.key,
"-" * len(self.key),
"\n".join(textwrap.wrap(self.description)),
self.type_help(),
)
class Env(SchemaProperty):
"""String dict schema for environment variables"""
def __init__(self, key, description):
super(Env, self).__init__(key, description, schema_type="object")
def to_schema(self):
schema = super(Env, self).to_schema()
schema[self.key]["additionalProperties"] = {"type": "string"}
return schema
def type_help(self):
return "Value is a map of strings"
class Arg(SchemaProperty):
"""Command argument with associated value"""
arg = None
def __init__(self, key, description, schema_type=None, arg=None):
super(Arg, self).__init__(key, description, schema_type=schema_type)
self.arg = arg
def arg_name(self):
if self.arg is None:
return "--%s" % self.key
return self.arg
def to_argument(self, value=None):
arg = self.arg_name()
if value is not None and value != "":
return [arg, value]
return []
class Flag(Arg):
"""Boolean value which does not contribute to arguments"""
def __init__(self, key, description):
super(Flag, self).__init__(key, description, schema_type="boolean")
def to_argument(self, value=None):
return []
def type_help(self):
return "Value is a boolean"
class ArgFlag(Arg):
"""Boolean value for a flag argument being set or not"""
def __init__(self, key, description, arg=None):
super(ArgFlag, self).__init__(
key, description, arg=arg, schema_type="boolean"
)
def to_argument(self, value=None):
if value:
return [self.arg_name()]
return []
def type_help(self):
return "Value is a boolean"
class ArgEnum(Arg):
"""String argument constrained to a list of allowed values"""
enum = None
def __init__(
self, key, description, schema_type="string", arg=None, enum=None
):
super(ArgEnum, self).__init__(
key, description, schema_type=schema_type, arg=arg
)
self.enum = enum and enum or []
def to_schema(self):
schema = super(ArgEnum, self).to_schema()
schema[self.key]["enum"] = self.enum
return schema
def type_help(self):
return "Allowed values: %s" % ", ".join(self.enum)
class ArgFlagRepeating(ArgEnum):
"""Flag argument which repeats the specified number of times"""
def __init__(self, key, description, arg=None, max_repeat=0):
enum = list(range(max_repeat + 1))
super(ArgFlagRepeating, self).__init__(
key, description, schema_type="integer", arg=arg, enum=enum
)
def to_argument(self, value):
return [self.arg] * value
def type_help(self):
return "Allowed values: %s" % ", ".join([str(i) for i in self.enum])
class ArgInt(Arg):
"""Integer argument which a minumum constraint"""
minimum = 1
def __init__(self, key, description, arg=None, minimum=1):
super(ArgInt, self).__init__(
key, description, arg=arg, schema_type="integer"
)
self.minimum = minimum
def to_schema(self):
schema = super(ArgInt, self).to_schema()
schema[self.key]["minimum"] = self.minimum
return schema
def to_argument(self, value):
return super(ArgInt, self).to_argument(str(value))
def type_help(self):
return "Value is an integer"
class ArgList(Arg):
"""List of strings converted to comma delimited argument"""
def __init__(self, key, description, arg=None):
super(ArgList, self).__init__(
key, description, arg=arg, schema_type="array"
)
def to_schema(self):
schema = super(ArgList, self).to_schema()
schema[self.key]["items"] = {"type": "string"}
return schema
def to_argument(self, value):
if not value:
return []
return super(ArgList, self).to_argument(",".join(value))
def type_help(self):
return "Value is a list of strings"
class ArgListPositional(ArgList):
"""List of strings converted to positional arguments"""
def __init__(self, key, description):
super(ArgListPositional, self).__init__(key, description)
def to_argument(self, value):
# it is already a list, just return it
return value
def type_help(self):
return "Value is a list of strings"
class ArgEnumList(ArgList):
"""List of strings constrained to a list of allowed values"""
enum = None
def __init__(self, key, description, arg=None, enum=None):
super(ArgEnumList, self).__init__(key, description, arg=arg)
self.enum = enum and enum or []
def to_schema(self):
schema = super(ArgEnumList, self).to_schema()
schema[self.key]["items"]["enum"] = self.enum
return schema
def type_help(self):
return (
"Value is a list of strings with allowed values: %s)"
% ", ".join(self.enum)
)
class ArgDictToString(Arg):
"""Dict with string values converted to key=value,key2=value2 argument"""
def __init__(self, key, description, arg=None):
super(ArgDictToString, self).__init__(
key, description, arg=arg, schema_type="object"
)
def to_schema(self):
schema = super(ArgDictToString, self).to_schema()
schema[self.key]["additionalProperties"] = {"type": "string"}
return schema
def to_argument(self, value):
as_list = []
for k, v in value.items():
as_list.append("%s=%s" % (k, v))
return super(ArgDictToString, self).to_argument(",".join(as_list))
def type_help(self):
return "Value is a map of strings"
PROPERTIES = [
Arg("imagename", "Set the imagename of the output image file.", arg="-o"),
ArgEnum(
"arch",
"Set the architecture of the image.",
arg="-a",
enum=[
"aarch64",
"amd64",
"arm64",
"armhf",
"powerpc",
"ppc64",
"ppc64el",
"ppc64le",
"s390x",
"x86_64",
],
),
ArgEnumList(
"types",
"Set the image types of the output image files.",
arg="-t",
enum=[
"qcow2",
"tar",
"tgz",
"squashfs",
"vhd",
"docker",
"aci",
"raw",
],
),
Env(
"environment",
"Environment variables to set during the image build.",
),
Flag(
"ramdisk",
"Whether to build a ramdisk image.",
),
ArgFlagRepeating(
"debug-trace",
"Tracing level to log, integer 0 is off.",
arg="-x",
max_repeat=2,
),
ArgFlag(
"uncompressed",
"Do not compress the image - larger but faster.",
arg="-u",
),
ArgFlag("clear", "Clear environment before starting work.", arg="-c"),
Arg(
"logfile",
"Save run output to given logfile.",
),
ArgFlag(
"checksum",
"Generate MD5 and SHA256 checksum files for the created image.",
),
ArgInt(
"image-size",
"Image size in GB for the created image.",
),
ArgInt(
"image-extra-size",
"Extra image size in GB for the created image.",
),
Arg(
"image-cache",
"Location for cached images, defaults to ~/.cache/image-create.",
),
ArgInt(
"max-online-resize",
"Max number of filesystem blocks to support when resizing. "
"Useful if you want a really large root partition when the "
"image is deployed. Using a very large value may run into a "
"known bug in resize2fs. Setting the value to 274877906944 "
"will get you a 1PB root file system. Making this "
"value unnecessarily large will consume extra disk "
"space on the root partition with extra file system inodes.",
),
ArgInt(
"min-tmpfs",
"Minimum size in GB needed in tmpfs to build the image.",
),
ArgInt(
"mkfs-journal-size",
"Filesystem journal size in MB to pass to mkfs.",
),
Arg(
"mkfs-options",
"Option flags to be passed directly to mkfs.",
),
ArgFlag("no-tmpfs", "Do not use tmpfs to speed image build."),
ArgFlag("offline", "Do not update cached resources."),
ArgDictToString(
"qemu-img-options",
"Option flags to be passed directly to qemu-img.",
),
Arg(
"root-label",
'Label for the root filesystem, defaults to "cloudimg-rootfs".',
),
Arg(
"ramdisk-element",
"Specify the main element to be used for building ramdisks. "
'Defaults to "ramdisk". Should be set to "dracut-ramdisk" '
"for platforms such as RHEL and CentOS that do not package busybox.",
),
ArgEnum(
"install-type",
"Specify the default installation type.",
enum=["source", "package"],
),
Arg(
"docker-target",
"Specify the repo and tag to use if the output type is docker, "
"defaults to the value of output imagename.",
),
ArgList(
"packages",
"Extra packages to install in the image. Runs once, after "
'"install.d" phase. Does not apply when ramdisk is true.',
arg="-p",
),
ArgFlag(
"skip-base",
'Skip the default inclusion of the "base" element. '
"Does not apply when ramdisk is true.",
arg="-n",
),
ArgListPositional(
"elements",
"list of elements to build the image with",
),
]
SCHEMA_PROPERTIES = {}
for arg in PROPERTIES:
SCHEMA_PROPERTIES.update(arg.to_schema())
DIB_SCHEMA = {
"type": "array",
"items": {
"type": "object",
"properties": SCHEMA_PROPERTIES,
"additionalProperties": False,
},
"additionalProperties": False,
}
class Command(object):
script = None
args = None
environ = None
def __init__(self, script, properties, entry):
self.script = script
self.args = []
self.environ = {}
for prop in properties:
if prop.key in entry:
value = entry[prop.key]
if isinstance(prop, Env):
self.environ.update(value)
elif isinstance(prop, Arg):
self.args.extend(prop.to_argument(value))
def merged_env(self):
environ = os.environ.copy()
# pre-seed some paths for the shell script
environ["_LIB"] = diskimage_builder.paths.get_path("lib")
environ.update(self.environ)
return environ
def command(self):
return ["bash", self.script] + self.args
def __repr__(self):
elements = []
for k, v in self.environ.items():
elements.append("%s=%s" % (k, shlex.quote(v)))
elements.extend([shlex.quote(a) for a in self.command()])
return " ".join(elements) + "\n"
def help_properties():
str = io.StringIO()
for prop in PROPERTIES:
str.write(prop.to_help())
str.write("\n\n")
return str.getvalue()
def get_args():
description = (
"""\
The file format is YAML which expects a list of image definition maps.
Supported entries for an image definition are:
%s
"""
% help_properties()
)
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"files",
metavar="<filename>",
nargs="+",
help="Paths to image build definition YAML files",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show the disk-image-create, ramdisk-image-create commands and "
"exit",
)
parser.add_argument(
"--stop-on-failure",
action="store_true",
help="Stop building images when an image build fails",
)
args = parser.parse_args(sys.argv[1:])
return args
def merge_entry(merged_entry, entry):
for k, v in entry.items():
if isinstance(v, list):
# append to existing list
list_value = merged_entry.setdefault(k, [])
list_value.extend(v)
elif isinstance(v, dict):
# update environment dict
dict_value = merged_entry.setdefault(k, {})
dict_value.update(v)
else:
# update value
merged_entry[k] = v
def build_commands(definition):
jsonschema.validate(definition, schema=DIB_SCHEMA)
dib_script = "%s/disk-image-create" % diskimage_builder.paths.get_path(
"lib"
)
rib_script = "%s/ramdisk-image-create" % diskimage_builder.paths.get_path(
"lib"
)
# Start with the default image name, 'image'
previous_imagename = "image"
merged_entries = collections.OrderedDict()
for entry in definition:
imagename = entry.get("imagename", previous_imagename)
previous_imagename = imagename
if imagename not in merged_entries:
merged_entries[imagename] = entry
else:
merge_entry(merged_entries[imagename], entry)
commands = []
for entry in merged_entries.values():
if entry.get("ramdisk", False):
commands.append(Command(rib_script, PROPERTIES, entry))
else:
commands.append(Command(dib_script, PROPERTIES, entry))
return commands
def main():
args = get_args()
# export the path to the current python
if not os.environ.get("DIB_PYTHON_EXEC"):
os.environ["DIB_PYTHON_EXEC"] = sys.executable
definitions = []
for file in args.files:
with open(file) as f:
definitions.extend(yaml.safe_load(f))
commands = build_commands(definitions)
final_returncode = 0
failed_command = None
for command in commands:
sys.stderr.write(str(command))
sys.stderr.write("\n")
sys.stderr.flush()
if not args.dry_run:
p = subprocess.Popen(command.command(), env=command.merged_env())
p.communicate()
if p.returncode != 0:
final_returncode = p.returncode
failed_command = command
if args.stop_on_failure:
break
if final_returncode != 0:
raise subprocess.CalledProcessError(
final_returncode, failed_command.command()
)

View File

@ -0,0 +1,587 @@
# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import jsonschema
import mock
import subprocess
import tempfile
import testtools
import os
import yaml
from diskimage_builder import diskimage_builder as dib
class TestDib(testtools.TestCase):
def assert_to_argument(self, expected, prop, value):
self.assertEqual(expected, prop.to_argument(value))
self.assert_schema_validate(prop, value)
def assert_schema_validate(self, prop, value, assert_failure=False):
# create a fake dict value and validate the schema against it
value_dict = {prop.key: value}
schema = {"type": "object", "properties": {}}
schema["properties"].update(prop.to_schema())
if assert_failure:
self.assertRaises(
jsonschema.exceptions.ValidationError,
jsonschema.validate,
value_dict,
schema=schema,
)
else:
jsonschema.validate(value_dict, schema=schema)
def test_schema_property(self):
x = dib.SchemaProperty(
"the_key", "the description", schema_type="integer"
)
self.assertEqual(
"""\
the_key
-------
the description
(Value is a string)""",
x.to_help(),
)
self.assertEqual({"the_key": {"type": "integer"}}, x.to_schema())
def test_env(self):
x = dib.Env(
"environment",
"environment variables to set during the image build",
)
self.assertEqual(
{
"environment": {
"additionalProperties": {"type": "string"},
"type": "object",
}
},
x.to_schema(),
)
def test_arg(self):
# no arg
x = dib.Arg("the-key", "")
self.assert_to_argument(["--the-key", "the value"], x, "the value")
# with arg
x = dib.Arg("the-key", "", arg="--key")
self.assert_to_argument(["--key", "the value"], x, "the value")
# with empty string value
self.assert_to_argument([], x, "")
def test_arg_flag(self):
x = dib.ArgFlag("doit", "do it", arg="-d")
self.assertEqual({"doit": {"type": "boolean"}}, x.to_schema())
# false value
self.assert_to_argument([], x, False)
# true value
self.assert_to_argument(["-d"], x, True)
def test_arg_enum(self):
x = dib.ArgEnum("choice", "", enum=["one", "two", "three"])
self.assertEqual(
{"choice": {"type": "string", "enum": ["one", "two", "three"]}},
x.to_schema(),
)
self.assert_to_argument(["--choice", "one"], x, "one")
self.assert_schema_validate(x, "two")
self.assert_schema_validate(x, "four", assert_failure=True)
def test_arg_flag_repeating(self):
x = dib.ArgFlagRepeating("log_level", "", arg="-v", max_repeat=3)
self.assertEqual(
{"log_level": {"type": "integer", "enum": [0, 1, 2, 3]}},
x.to_schema(),
)
self.assert_to_argument([], x, 0)
self.assert_to_argument(["-v"], x, 1)
self.assert_to_argument(["-v", "-v"], x, 2)
self.assert_to_argument(["-v", "-v", "-v"], x, 3)
def test_arg_int(self):
x = dib.ArgInt("size", "")
self.assertEqual(
{"size": {"type": "integer", "minimum": 1}}, x.to_schema()
)
self.assert_to_argument(["--size", "10"], x, 10)
self.assert_schema_validate(x, -1, assert_failure=True)
def test_arg_list(self):
x = dib.ArgList("packages", "", arg="-p")
self.assertEqual(
{"packages": {"type": "array", "items": {"type": "string"}}},
x.to_schema(),
)
self.assert_to_argument(
["-p", "wget,vim-enhanced"], x, ["wget", "vim-enhanced"]
)
self.assert_to_argument([], x, [])
def test_arg_list_position(self):
x = dib.ArgListPositional("elements", "")
self.assert_to_argument(
["centos", "vm", "bootloader"], x, ["centos", "vm", "bootloader"]
)
def test_arg_enum_list(self):
x = dib.ArgEnumList(
"types", "", arg="-t", enum=["qcow2", "tar", "raw"]
)
self.assertEqual(
{
"types": {
"items": {
"enum": ["qcow2", "tar", "raw"],
"type": "string",
},
"type": "array",
}
},
x.to_schema(),
)
self.assert_to_argument(["-t", "qcow2,raw"], x, ["qcow2", "raw"])
def test_arg_dict_to_string(self):
x = dib.ArgDictToString("options", "")
self.assertEqual(
{
"options": {
"additionalProperties": {"type": "string"},
"type": "object",
}
},
x.to_schema(),
)
self.assert_to_argument(
["--options", "foo=bar,baz=ingo"],
x,
{"foo": "bar", "baz": "ingo"},
)
def test_command_merged_env(self):
c = dib.Command(
"echo",
properties=[dib.Env("environment", "")],
entry={
"environment": {
"ELEMENTS_PATH": "/path/to/elements",
"DIB_CLOUD_IMAGES": "/path/to/image.qcow2",
}
},
)
env = c.merged_env()
self.assertEqual("/path/to/elements", env["ELEMENTS_PATH"])
self.assertEqual("/path/to/image.qcow2", env["DIB_CLOUD_IMAGES"])
self.assertIn(needle="_LIB", haystack=env)
# this will be merged with the whole environment, so there will be
# more than 3 values
self.assertGreater(len(env), 3)
def test_command(self):
c = dib.Command(
"echo",
properties=[
dib.Env("environment", ""),
dib.Arg("the-key", "", arg="--key"),
dib.ArgFlag("doit", "do it", arg="-d"),
dib.ArgFlagRepeating("verbose", "", arg="-v", max_repeat=3),
dib.ArgDictToString("options", ""),
dib.ArgListPositional("elements", ""),
],
entry={
"environment": {
"ELEMENTS_PATH": "/path/to/elements",
"DIB_CLOUD_IMAGES": "~/image.qcow2",
},
"the-key": "the-value",
"doit": True,
"verbose": 3,
"options": {"foo": "bar"},
"elements": ["centos", "vm"],
},
)
self.assertEqual(
[
"bash",
"echo",
"--key",
"the-value",
"-d",
"-v",
"-v",
"-v",
"--options",
"foo=bar",
"centos",
"vm",
],
c.command(),
)
self.assertEqual(
"ELEMENTS_PATH=/path/to/elements "
"DIB_CLOUD_IMAGES='~/image.qcow2' "
"bash echo --key the-value -d -v -v -v --options foo=bar "
"centos vm\n",
str(c),
)
def test_merge_entry(self):
# override normal attributes
merged_entry = {
"imagename": "image1",
"elements": ["one", "two", "three"],
"debug-trace": 1,
}
dib.merge_entry(
merged_entry,
{
"imagename": "image1",
"debug-trace": 2,
"logfile": "image1.log",
},
)
self.assertEqual(
{
"imagename": "image1",
"elements": ["one", "two", "three"],
"debug-trace": 2,
"logfile": "image1.log",
},
merged_entry,
)
# append list attributes, update dict attributes
merged_entry = {
"imagename": "image1",
"elements": ["one", "two", "three"],
"environment": {
"DIB_ONE": "1",
"DIB_TWO": "2",
},
}
dib.merge_entry(
merged_entry,
{
"imagename": "image1",
"elements": ["four", "five"],
"environment": {
"DIB_TWO": "two",
"DIB_THREE": "three",
},
},
)
self.assertEqual(
{
"imagename": "image1",
"elements": ["one", "two", "three", "four", "five"],
"environment": {
"DIB_ONE": "1",
"DIB_TWO": "two",
"DIB_THREE": "three",
},
},
merged_entry,
)
@mock.patch("diskimage_builder.paths.get_path")
def test_build_commands_simple(self, mock_get_path):
mock_get_path.return_value = "/lib"
commands = dib.build_commands(
[
{
"imagename": "centos-minimal",
"elements": ["centos", "vm"],
},
{
"imagename": "ironic-python-agent",
"ramdisk": True,
"elements": [
"ironic-python-agent-ramdisk",
"extra-hardware",
],
},
]
)
self.assertEqual(
[
"bash",
"/lib/disk-image-create",
"-o",
"centos-minimal",
"centos",
"vm",
],
commands[0].command(),
)
self.assertEqual(
[
"bash",
"/lib/ramdisk-image-create",
"-o",
"ironic-python-agent",
"ironic-python-agent-ramdisk",
"extra-hardware",
],
commands[1].command(),
)
@mock.patch("diskimage_builder.paths.get_path")
def test_build_commands_merged(self, mock_get_path):
mock_get_path.return_value = "/lib"
commands = dib.build_commands(
[
{ # base definition
"imagename": "centos-minimal",
"elements": ["centos", "vm"],
"environment": {"foo": "bar", "zip": "zap"},
},
{ # merge with previous when no imagename
"elements": ["devuser"],
"logfile": "centos-minimal.log",
},
{ # merge when same imagename
"imagename": "centos-minimal",
"environment": {"foo": "baz", "one": "two"},
},
]
)
self.assertEqual(1, len(commands))
self.assertEqual(
[
"bash",
"/lib/disk-image-create",
"-o",
"centos-minimal",
"--logfile",
"centos-minimal.log",
"centos",
"vm",
"devuser",
],
commands[0].command(),
)
@mock.patch("diskimage_builder.paths.get_path")
def test_build_commands_all_arguments(self, mock_get_path):
mock_get_path.return_value = "/lib"
commands = dib.build_commands(
[
{
"arch": "amd64",
"imagename": "everyoption",
"types": ["qcow2"],
"debug-trace": 2,
"uncompressed": True,
"clear": True,
"logfile": "./logfile.log",
"checksum": True,
"image-size": 40,
"image-extra-size": 1,
"image-cache": "~/.cache/dib",
"max-online-resize": 1000,
"min-tmpfs": 7,
"mkfs-journal-size": 1,
"mkfs-options": "-D",
"no-tmpfs": True,
"offline": True,
"qemu-img-options": {"size": "10"},
"ramdisk-element": "dracut-ramdisk",
"install-type": "package",
"root-label": "root",
"docker-target": "everyoption:latest",
"packages": ["wget", "tmux"],
"skip-base": True,
"elements": ["centos", "vm"],
}
]
)
self.assertEqual(
[
"bash",
"/lib/disk-image-create",
"-o",
"everyoption",
"-a",
"amd64",
"-t",
"qcow2",
"-x",
"-x",
"-u",
"-c",
"--logfile",
"./logfile.log",
"--checksum",
"--image-size",
"40",
"--image-extra-size",
"1",
"--image-cache",
"~/.cache/dib",
"--max-online-resize",
"1000",
"--min-tmpfs",
"7",
"--mkfs-journal-size",
"1",
"--mkfs-options",
"-D",
"--no-tmpfs",
"--offline",
"--qemu-img-options",
"size=10",
"--root-label",
"root",
"--ramdisk-element",
"dracut-ramdisk",
"--install-type",
"package",
"--docker-target",
"everyoption:latest",
"-p",
"wget,tmux",
"-n",
"centos",
"vm",
],
commands[0].command(),
)
def write_image_definition(self):
image_def = [
{
"imagename": "centos-minimal",
"elements": ["centos", "vm"],
},
{
"imagename": "ironic-python-agent",
"ramdisk": True,
"elements": [
"ironic-python-agent-ramdisk",
"extra-hardware",
],
},
]
with tempfile.NamedTemporaryFile(delete=False) as deffile:
self.addCleanup(os.remove, deffile.name)
self.filelist = [deffile.name]
with open(deffile.name, "w") as f:
f.write(yaml.dump(image_def))
return deffile.name
@mock.patch("diskimage_builder.paths.get_path")
@mock.patch("diskimage_builder.diskimage_builder.get_args")
@mock.patch("subprocess.Popen")
def test_main_dry_run(self, mock_popen, mock_get_args, mock_get_path):
mock_get_path.return_value = "/lib"
mock_get_args.return_value = mock.Mock(
dry_run=True, files=[self.write_image_definition()]
)
dib.main()
mock_popen.assert_not_called()
@mock.patch("diskimage_builder.paths.get_path")
@mock.patch("diskimage_builder.diskimage_builder.get_args")
@mock.patch("subprocess.Popen")
def test_main(self, mock_popen, mock_get_args, mock_get_path):
mock_get_path.return_value = "/lib"
mock_get_args.return_value = mock.Mock(
dry_run=False,
files=[self.write_image_definition()],
stop_on_failure=False,
)
process = mock.Mock()
process.returncode = 0
mock_popen.return_value = process
dib.main()
self.assertEqual(2, mock_popen.call_count)
mock_popen.assert_has_calls(
[
mock.call(
[
"bash",
"/lib/disk-image-create",
"-o",
"centos-minimal",
"centos",
"vm",
],
env=mock.ANY,
),
mock.call(
[
"bash",
"/lib/ramdisk-image-create",
"-o",
"ironic-python-agent",
"ironic-python-agent-ramdisk",
"extra-hardware",
],
env=mock.ANY,
),
],
any_order=True,
)
@mock.patch("diskimage_builder.paths.get_path")
@mock.patch("diskimage_builder.diskimage_builder.get_args")
@mock.patch("subprocess.Popen")
def test_main_stop_on_failure(
self, mock_popen, mock_get_args, mock_get_path
):
mock_get_path.return_value = "/lib"
mock_get_args.return_value = mock.Mock(
dry_run=False,
files=[self.write_image_definition()],
stop_on_failure=True,
)
process = mock.Mock()
process.returncode = -1
mock_popen.return_value = process
e = self.assertRaises(subprocess.CalledProcessError, dib.main)
self.assertEqual(1, mock_popen.call_count)
self.assertEqual(-1, e.returncode)
@mock.patch("diskimage_builder.paths.get_path")
@mock.patch("diskimage_builder.diskimage_builder.get_args")
@mock.patch("subprocess.Popen")
def test_main_continue_on_failure(
self, mock_popen, mock_get_args, mock_get_path
):
mock_get_path.return_value = "/lib"
mock_get_args.return_value = mock.Mock(
dry_run=False,
files=[self.write_image_definition()],
stop_on_failure=False,
)
process = mock.Mock()
process.returncode.side_effect = -1
mock_popen.return_value = process
self.assertRaises(subprocess.CalledProcessError, dib.main)
self.assertEqual(2, mock_popen.call_count)

View File

@ -13,6 +13,7 @@ imagesize==0.7.1
iso8601==0.1.11
isort==4.3.4
Jinja2==2.10
jsonschema==3.2.0
keystoneauth1==3.4.0
lazy-object-proxy==1.3.1
linecache2==1.0.0

View File

@ -0,0 +1,8 @@
---
features:
- |
A new command ``diskimage-builder`` is now provided (also aliased to
``dib``). It provides a YAML file based interface to ``disk-image-create``
and ``ramdisk-image-create``. All arguments can be specified as yaml
dictionary entries, along with environment variable overrides which are
applied over the provided environment.

View File

@ -8,3 +8,4 @@ PyYAML>=3.12 # MIT
stevedore>=1.20.0 # Apache-2.0
# NOTE(ianw) in here because dib-lint uses flake8
flake8<6.0.0,>=3.6.0 # MIT
jsonschema>=3.2.0 # MIT

View File

@ -35,6 +35,7 @@ data_files =
console_scripts =
disk-image-create = diskimage_builder.disk_image_create:main
ramdisk-image-create = diskimage_builder.disk_image_create:main
diskimage-builder = diskimage_builder.diskimage_builder:main
diskimage_builder.block_device.plugin =
local_loop = diskimage_builder.block_device.level0.localloop:LocalLoop