588 lines
18 KiB
Python
588 lines
18 KiB
Python
|
# Copyright 2023 Red Hat, Inc.
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||
|
# not use this file except in compliance with the License. You may obtain
|
||
|
# a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||
|
# License for the specific language governing permissions and limitations
|
||
|
# under the License.
|
||
|
|
||
|
import jsonschema
|
||
|
import mock
|
||
|
import subprocess
|
||
|
import tempfile
|
||
|
import testtools
|
||
|
import os
|
||
|
import yaml
|
||
|
|
||
|
from diskimage_builder import diskimage_builder as dib
|
||
|
|
||
|
|
||
|
class TestDib(testtools.TestCase):
|
||
|
def assert_to_argument(self, expected, prop, value):
|
||
|
self.assertEqual(expected, prop.to_argument(value))
|
||
|
self.assert_schema_validate(prop, value)
|
||
|
|
||
|
def assert_schema_validate(self, prop, value, assert_failure=False):
|
||
|
# create a fake dict value and validate the schema against it
|
||
|
value_dict = {prop.key: value}
|
||
|
schema = {"type": "object", "properties": {}}
|
||
|
schema["properties"].update(prop.to_schema())
|
||
|
if assert_failure:
|
||
|
self.assertRaises(
|
||
|
jsonschema.exceptions.ValidationError,
|
||
|
jsonschema.validate,
|
||
|
value_dict,
|
||
|
schema=schema,
|
||
|
)
|
||
|
else:
|
||
|
jsonschema.validate(value_dict, schema=schema)
|
||
|
|
||
|
def test_schema_property(self):
|
||
|
x = dib.SchemaProperty(
|
||
|
"the_key", "the description", schema_type="integer"
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
"""\
|
||
|
the_key
|
||
|
-------
|
||
|
the description
|
||
|
(Value is a string)""",
|
||
|
x.to_help(),
|
||
|
)
|
||
|
self.assertEqual({"the_key": {"type": "integer"}}, x.to_schema())
|
||
|
|
||
|
def test_env(self):
|
||
|
x = dib.Env(
|
||
|
"environment",
|
||
|
"environment variables to set during the image build",
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
{
|
||
|
"environment": {
|
||
|
"additionalProperties": {"type": "string"},
|
||
|
"type": "object",
|
||
|
}
|
||
|
},
|
||
|
x.to_schema(),
|
||
|
)
|
||
|
|
||
|
def test_arg(self):
|
||
|
# no arg
|
||
|
x = dib.Arg("the-key", "")
|
||
|
self.assert_to_argument(["--the-key", "the value"], x, "the value")
|
||
|
|
||
|
# with arg
|
||
|
x = dib.Arg("the-key", "", arg="--key")
|
||
|
self.assert_to_argument(["--key", "the value"], x, "the value")
|
||
|
|
||
|
# with empty string value
|
||
|
self.assert_to_argument([], x, "")
|
||
|
|
||
|
def test_arg_flag(self):
|
||
|
x = dib.ArgFlag("doit", "do it", arg="-d")
|
||
|
self.assertEqual({"doit": {"type": "boolean"}}, x.to_schema())
|
||
|
|
||
|
# false value
|
||
|
self.assert_to_argument([], x, False)
|
||
|
|
||
|
# true value
|
||
|
self.assert_to_argument(["-d"], x, True)
|
||
|
|
||
|
def test_arg_enum(self):
|
||
|
x = dib.ArgEnum("choice", "", enum=["one", "two", "three"])
|
||
|
self.assertEqual(
|
||
|
{"choice": {"type": "string", "enum": ["one", "two", "three"]}},
|
||
|
x.to_schema(),
|
||
|
)
|
||
|
self.assert_to_argument(["--choice", "one"], x, "one")
|
||
|
self.assert_schema_validate(x, "two")
|
||
|
self.assert_schema_validate(x, "four", assert_failure=True)
|
||
|
|
||
|
def test_arg_flag_repeating(self):
|
||
|
x = dib.ArgFlagRepeating("log_level", "", arg="-v", max_repeat=3)
|
||
|
self.assertEqual(
|
||
|
{"log_level": {"type": "integer", "enum": [0, 1, 2, 3]}},
|
||
|
x.to_schema(),
|
||
|
)
|
||
|
self.assert_to_argument([], x, 0)
|
||
|
self.assert_to_argument(["-v"], x, 1)
|
||
|
self.assert_to_argument(["-v", "-v"], x, 2)
|
||
|
self.assert_to_argument(["-v", "-v", "-v"], x, 3)
|
||
|
|
||
|
def test_arg_int(self):
|
||
|
x = dib.ArgInt("size", "")
|
||
|
self.assertEqual(
|
||
|
{"size": {"type": "integer", "minimum": 1}}, x.to_schema()
|
||
|
)
|
||
|
self.assert_to_argument(["--size", "10"], x, 10)
|
||
|
self.assert_schema_validate(x, -1, assert_failure=True)
|
||
|
|
||
|
def test_arg_list(self):
|
||
|
x = dib.ArgList("packages", "", arg="-p")
|
||
|
self.assertEqual(
|
||
|
{"packages": {"type": "array", "items": {"type": "string"}}},
|
||
|
x.to_schema(),
|
||
|
)
|
||
|
self.assert_to_argument(
|
||
|
["-p", "wget,vim-enhanced"], x, ["wget", "vim-enhanced"]
|
||
|
)
|
||
|
self.assert_to_argument([], x, [])
|
||
|
|
||
|
def test_arg_list_position(self):
|
||
|
x = dib.ArgListPositional("elements", "")
|
||
|
self.assert_to_argument(
|
||
|
["centos", "vm", "bootloader"], x, ["centos", "vm", "bootloader"]
|
||
|
)
|
||
|
|
||
|
def test_arg_enum_list(self):
|
||
|
x = dib.ArgEnumList(
|
||
|
"types", "", arg="-t", enum=["qcow2", "tar", "raw"]
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
{
|
||
|
"types": {
|
||
|
"items": {
|
||
|
"enum": ["qcow2", "tar", "raw"],
|
||
|
"type": "string",
|
||
|
},
|
||
|
"type": "array",
|
||
|
}
|
||
|
},
|
||
|
x.to_schema(),
|
||
|
)
|
||
|
self.assert_to_argument(["-t", "qcow2,raw"], x, ["qcow2", "raw"])
|
||
|
|
||
|
def test_arg_dict_to_string(self):
|
||
|
x = dib.ArgDictToString("options", "")
|
||
|
self.assertEqual(
|
||
|
{
|
||
|
"options": {
|
||
|
"additionalProperties": {"type": "string"},
|
||
|
"type": "object",
|
||
|
}
|
||
|
},
|
||
|
x.to_schema(),
|
||
|
)
|
||
|
self.assert_to_argument(
|
||
|
["--options", "foo=bar,baz=ingo"],
|
||
|
x,
|
||
|
{"foo": "bar", "baz": "ingo"},
|
||
|
)
|
||
|
|
||
|
def test_command_merged_env(self):
|
||
|
c = dib.Command(
|
||
|
"echo",
|
||
|
properties=[dib.Env("environment", "")],
|
||
|
entry={
|
||
|
"environment": {
|
||
|
"ELEMENTS_PATH": "/path/to/elements",
|
||
|
"DIB_CLOUD_IMAGES": "/path/to/image.qcow2",
|
||
|
}
|
||
|
},
|
||
|
)
|
||
|
env = c.merged_env()
|
||
|
self.assertEqual("/path/to/elements", env["ELEMENTS_PATH"])
|
||
|
self.assertEqual("/path/to/image.qcow2", env["DIB_CLOUD_IMAGES"])
|
||
|
self.assertIn(needle="_LIB", haystack=env)
|
||
|
# this will be merged with the whole environment, so there will be
|
||
|
# more than 3 values
|
||
|
self.assertGreater(len(env), 3)
|
||
|
|
||
|
def test_command(self):
|
||
|
c = dib.Command(
|
||
|
"echo",
|
||
|
properties=[
|
||
|
dib.Env("environment", ""),
|
||
|
dib.Arg("the-key", "", arg="--key"),
|
||
|
dib.ArgFlag("doit", "do it", arg="-d"),
|
||
|
dib.ArgFlagRepeating("verbose", "", arg="-v", max_repeat=3),
|
||
|
dib.ArgDictToString("options", ""),
|
||
|
dib.ArgListPositional("elements", ""),
|
||
|
],
|
||
|
entry={
|
||
|
"environment": {
|
||
|
"ELEMENTS_PATH": "/path/to/elements",
|
||
|
"DIB_CLOUD_IMAGES": "~/image.qcow2",
|
||
|
},
|
||
|
"the-key": "the-value",
|
||
|
"doit": True,
|
||
|
"verbose": 3,
|
||
|
"options": {"foo": "bar"},
|
||
|
"elements": ["centos", "vm"],
|
||
|
},
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
[
|
||
|
"bash",
|
||
|
"echo",
|
||
|
"--key",
|
||
|
"the-value",
|
||
|
"-d",
|
||
|
"-v",
|
||
|
"-v",
|
||
|
"-v",
|
||
|
"--options",
|
||
|
"foo=bar",
|
||
|
"centos",
|
||
|
"vm",
|
||
|
],
|
||
|
c.command(),
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
"ELEMENTS_PATH=/path/to/elements "
|
||
|
"DIB_CLOUD_IMAGES='~/image.qcow2' "
|
||
|
"bash echo --key the-value -d -v -v -v --options foo=bar "
|
||
|
"centos vm\n",
|
||
|
str(c),
|
||
|
)
|
||
|
|
||
|
def test_merge_entry(self):
|
||
|
# override normal attributes
|
||
|
merged_entry = {
|
||
|
"imagename": "image1",
|
||
|
"elements": ["one", "two", "three"],
|
||
|
"debug-trace": 1,
|
||
|
}
|
||
|
dib.merge_entry(
|
||
|
merged_entry,
|
||
|
{
|
||
|
"imagename": "image1",
|
||
|
"debug-trace": 2,
|
||
|
"logfile": "image1.log",
|
||
|
},
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
{
|
||
|
"imagename": "image1",
|
||
|
"elements": ["one", "two", "three"],
|
||
|
"debug-trace": 2,
|
||
|
"logfile": "image1.log",
|
||
|
},
|
||
|
merged_entry,
|
||
|
)
|
||
|
|
||
|
# append list attributes, update dict attributes
|
||
|
merged_entry = {
|
||
|
"imagename": "image1",
|
||
|
"elements": ["one", "two", "three"],
|
||
|
"environment": {
|
||
|
"DIB_ONE": "1",
|
||
|
"DIB_TWO": "2",
|
||
|
},
|
||
|
}
|
||
|
dib.merge_entry(
|
||
|
merged_entry,
|
||
|
{
|
||
|
"imagename": "image1",
|
||
|
"elements": ["four", "five"],
|
||
|
"environment": {
|
||
|
"DIB_TWO": "two",
|
||
|
"DIB_THREE": "three",
|
||
|
},
|
||
|
},
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
{
|
||
|
"imagename": "image1",
|
||
|
"elements": ["one", "two", "three", "four", "five"],
|
||
|
"environment": {
|
||
|
"DIB_ONE": "1",
|
||
|
"DIB_TWO": "two",
|
||
|
"DIB_THREE": "three",
|
||
|
},
|
||
|
},
|
||
|
merged_entry,
|
||
|
)
|
||
|
|
||
|
@mock.patch("diskimage_builder.paths.get_path")
|
||
|
def test_build_commands_simple(self, mock_get_path):
|
||
|
mock_get_path.return_value = "/lib"
|
||
|
commands = dib.build_commands(
|
||
|
[
|
||
|
{
|
||
|
"imagename": "centos-minimal",
|
||
|
"elements": ["centos", "vm"],
|
||
|
},
|
||
|
{
|
||
|
"imagename": "ironic-python-agent",
|
||
|
"ramdisk": True,
|
||
|
"elements": [
|
||
|
"ironic-python-agent-ramdisk",
|
||
|
"extra-hardware",
|
||
|
],
|
||
|
},
|
||
|
]
|
||
|
)
|
||
|
|
||
|
self.assertEqual(
|
||
|
[
|
||
|
"bash",
|
||
|
"/lib/disk-image-create",
|
||
|
"-o",
|
||
|
"centos-minimal",
|
||
|
"centos",
|
||
|
"vm",
|
||
|
],
|
||
|
commands[0].command(),
|
||
|
)
|
||
|
self.assertEqual(
|
||
|
[
|
||
|
"bash",
|
||
|
"/lib/ramdisk-image-create",
|
||
|
"-o",
|
||
|
"ironic-python-agent",
|
||
|
"ironic-python-agent-ramdisk",
|
||
|
"extra-hardware",
|
||
|
],
|
||
|
commands[1].command(),
|
||
|
)
|
||
|
|
||
|
@mock.patch("diskimage_builder.paths.get_path")
|
||
|
def test_build_commands_merged(self, mock_get_path):
|
||
|
mock_get_path.return_value = "/lib"
|
||
|
commands = dib.build_commands(
|
||
|
[
|
||
|
{ # base definition
|
||
|
"imagename": "centos-minimal",
|
||
|
"elements": ["centos", "vm"],
|
||
|
"environment": {"foo": "bar", "zip": "zap"},
|
||
|
},
|
||
|
{ # merge with previous when no imagename
|
||
|
"elements": ["devuser"],
|
||
|
"logfile": "centos-minimal.log",
|
||
|
},
|
||
|
{ # merge when same imagename
|
||
|
"imagename": "centos-minimal",
|
||
|
"environment": {"foo": "baz", "one": "two"},
|
||
|
},
|
||
|
]
|
||
|
)
|
||
|
|
||
|
self.assertEqual(1, len(commands))
|
||
|
self.assertEqual(
|
||
|
[
|
||
|
"bash",
|
||
|
"/lib/disk-image-create",
|
||
|
"-o",
|
||
|
"centos-minimal",
|
||
|
"--logfile",
|
||
|
"centos-minimal.log",
|
||
|
"centos",
|
||
|
"vm",
|
||
|
"devuser",
|
||
|
],
|
||
|
commands[0].command(),
|
||
|
)
|
||
|
|
||
|
@mock.patch("diskimage_builder.paths.get_path")
|
||
|
def test_build_commands_all_arguments(self, mock_get_path):
|
||
|
mock_get_path.return_value = "/lib"
|
||
|
commands = dib.build_commands(
|
||
|
[
|
||
|
{
|
||
|
"arch": "amd64",
|
||
|
"imagename": "everyoption",
|
||
|
"types": ["qcow2"],
|
||
|
"debug-trace": 2,
|
||
|
"uncompressed": True,
|
||
|
"clear": True,
|
||
|
"logfile": "./logfile.log",
|
||
|
"checksum": True,
|
||
|
"image-size": 40,
|
||
|
"image-extra-size": 1,
|
||
|
"image-cache": "~/.cache/dib",
|
||
|
"max-online-resize": 1000,
|
||
|
"min-tmpfs": 7,
|
||
|
"mkfs-journal-size": 1,
|
||
|
"mkfs-options": "-D",
|
||
|
"no-tmpfs": True,
|
||
|
"offline": True,
|
||
|
"qemu-img-options": {"size": "10"},
|
||
|
"ramdisk-element": "dracut-ramdisk",
|
||
|
"install-type": "package",
|
||
|
"root-label": "root",
|
||
|
"docker-target": "everyoption:latest",
|
||
|
"packages": ["wget", "tmux"],
|
||
|
"skip-base": True,
|
||
|
"elements": ["centos", "vm"],
|
||
|
}
|
||
|
]
|
||
|
)
|
||
|
|
||
|
self.assertEqual(
|
||
|
[
|
||
|
"bash",
|
||
|
"/lib/disk-image-create",
|
||
|
"-o",
|
||
|
"everyoption",
|
||
|
"-a",
|
||
|
"amd64",
|
||
|
"-t",
|
||
|
"qcow2",
|
||
|
"-x",
|
||
|
"-x",
|
||
|
"-u",
|
||
|
"-c",
|
||
|
"--logfile",
|
||
|
"./logfile.log",
|
||
|
"--checksum",
|
||
|
"--image-size",
|
||
|
"40",
|
||
|
"--image-extra-size",
|
||
|
"1",
|
||
|
"--image-cache",
|
||
|
"~/.cache/dib",
|
||
|
"--max-online-resize",
|
||
|
"1000",
|
||
|
"--min-tmpfs",
|
||
|
"7",
|
||
|
"--mkfs-journal-size",
|
||
|
"1",
|
||
|
"--mkfs-options",
|
||
|
"-D",
|
||
|
"--no-tmpfs",
|
||
|
"--offline",
|
||
|
"--qemu-img-options",
|
||
|
"size=10",
|
||
|
"--root-label",
|
||
|
"root",
|
||
|
"--ramdisk-element",
|
||
|
"dracut-ramdisk",
|
||
|
"--install-type",
|
||
|
"package",
|
||
|
"--docker-target",
|
||
|
"everyoption:latest",
|
||
|
"-p",
|
||
|
"wget,tmux",
|
||
|
"-n",
|
||
|
"centos",
|
||
|
"vm",
|
||
|
],
|
||
|
commands[0].command(),
|
||
|
)
|
||
|
|
||
|
def write_image_definition(self):
|
||
|
image_def = [
|
||
|
{
|
||
|
"imagename": "centos-minimal",
|
||
|
"elements": ["centos", "vm"],
|
||
|
},
|
||
|
{
|
||
|
"imagename": "ironic-python-agent",
|
||
|
"ramdisk": True,
|
||
|
"elements": [
|
||
|
"ironic-python-agent-ramdisk",
|
||
|
"extra-hardware",
|
||
|
],
|
||
|
},
|
||
|
]
|
||
|
with tempfile.NamedTemporaryFile(delete=False) as deffile:
|
||
|
self.addCleanup(os.remove, deffile.name)
|
||
|
self.filelist = [deffile.name]
|
||
|
with open(deffile.name, "w") as f:
|
||
|
f.write(yaml.dump(image_def))
|
||
|
return deffile.name
|
||
|
|
||
|
@mock.patch("diskimage_builder.paths.get_path")
|
||
|
@mock.patch("diskimage_builder.diskimage_builder.get_args")
|
||
|
@mock.patch("subprocess.Popen")
|
||
|
def test_main_dry_run(self, mock_popen, mock_get_args, mock_get_path):
|
||
|
mock_get_path.return_value = "/lib"
|
||
|
mock_get_args.return_value = mock.Mock(
|
||
|
dry_run=True, files=[self.write_image_definition()]
|
||
|
)
|
||
|
dib.main()
|
||
|
mock_popen.assert_not_called()
|
||
|
|
||
|
@mock.patch("diskimage_builder.paths.get_path")
|
||
|
@mock.patch("diskimage_builder.diskimage_builder.get_args")
|
||
|
@mock.patch("subprocess.Popen")
|
||
|
def test_main(self, mock_popen, mock_get_args, mock_get_path):
|
||
|
mock_get_path.return_value = "/lib"
|
||
|
mock_get_args.return_value = mock.Mock(
|
||
|
dry_run=False,
|
||
|
files=[self.write_image_definition()],
|
||
|
stop_on_failure=False,
|
||
|
)
|
||
|
|
||
|
process = mock.Mock()
|
||
|
process.returncode = 0
|
||
|
mock_popen.return_value = process
|
||
|
|
||
|
dib.main()
|
||
|
self.assertEqual(2, mock_popen.call_count)
|
||
|
mock_popen.assert_has_calls(
|
||
|
[
|
||
|
mock.call(
|
||
|
[
|
||
|
"bash",
|
||
|
"/lib/disk-image-create",
|
||
|
"-o",
|
||
|
"centos-minimal",
|
||
|
"centos",
|
||
|
"vm",
|
||
|
],
|
||
|
env=mock.ANY,
|
||
|
),
|
||
|
mock.call(
|
||
|
[
|
||
|
"bash",
|
||
|
"/lib/ramdisk-image-create",
|
||
|
"-o",
|
||
|
"ironic-python-agent",
|
||
|
"ironic-python-agent-ramdisk",
|
||
|
"extra-hardware",
|
||
|
],
|
||
|
env=mock.ANY,
|
||
|
),
|
||
|
],
|
||
|
any_order=True,
|
||
|
)
|
||
|
|
||
|
@mock.patch("diskimage_builder.paths.get_path")
|
||
|
@mock.patch("diskimage_builder.diskimage_builder.get_args")
|
||
|
@mock.patch("subprocess.Popen")
|
||
|
def test_main_stop_on_failure(
|
||
|
self, mock_popen, mock_get_args, mock_get_path
|
||
|
):
|
||
|
mock_get_path.return_value = "/lib"
|
||
|
mock_get_args.return_value = mock.Mock(
|
||
|
dry_run=False,
|
||
|
files=[self.write_image_definition()],
|
||
|
stop_on_failure=True,
|
||
|
)
|
||
|
|
||
|
process = mock.Mock()
|
||
|
process.returncode = -1
|
||
|
mock_popen.return_value = process
|
||
|
|
||
|
e = self.assertRaises(subprocess.CalledProcessError, dib.main)
|
||
|
self.assertEqual(1, mock_popen.call_count)
|
||
|
self.assertEqual(-1, e.returncode)
|
||
|
|
||
|
@mock.patch("diskimage_builder.paths.get_path")
|
||
|
@mock.patch("diskimage_builder.diskimage_builder.get_args")
|
||
|
@mock.patch("subprocess.Popen")
|
||
|
def test_main_continue_on_failure(
|
||
|
self, mock_popen, mock_get_args, mock_get_path
|
||
|
):
|
||
|
mock_get_path.return_value = "/lib"
|
||
|
mock_get_args.return_value = mock.Mock(
|
||
|
dry_run=False,
|
||
|
files=[self.write_image_definition()],
|
||
|
stop_on_failure=False,
|
||
|
)
|
||
|
|
||
|
process = mock.Mock()
|
||
|
process.returncode.side_effect = -1
|
||
|
mock_popen.return_value = process
|
||
|
|
||
|
self.assertRaises(subprocess.CalledProcessError, dib.main)
|
||
|
self.assertEqual(2, mock_popen.call_count)
|