stx-puppet/puppet-manifests/tests/filesystem_mock.py
Lucas Ratusznei Fonseca 36eabd877f Fix ifup error message of vlan that is already configured by kickstart
The first time apply_network_config.py runs in non controllers with mgmt
over VLAN, the pxeboot/mgmt interface is already configured by ifupdown.
When ifup is called for the label that holds the mgmt address, the
following error occurs:
Error: ipv6: address already assigned.
ifup: failed to bring up vlan10:1-22
This happens because since the /etc/network/interfaces.d/auto file does
not exist yet, all the interfaces defined by puppet are considered "new"
and are not required to be set down before ifup.
This change adds logic to check if any of the newly introduced
interfaces are already up, and if it is the case, adds them to the
down list.

Test plan

[PASS] STANDARD IPv6 full install on VirtualBox (2 storages + 1 compute)

Partial-Bug: 2103645
Change-Id: Id7968521606ea54085468b1e55798c75663e7561
Signed-off-by: Lucas Ratusznei Fonseca <lucas.ratuszneifonseca@windriver.com>
2025-04-01 16:27:36 -03:00

319 lines
11 KiB
Python

#
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import io
# Keys for filesystem node properties
PARENT = "parent"
TYPE = "type"
FILE = "file"
DIR = "dir"
LINK = "link"
CONTENTS = "contents"
TARGET = "target"
REF = "ref"
LISTENERS = "listeners"
class FilesystemMockError(BaseException):
pass
class FileMock():
def __init__(self, fs, entry):
self.fs = fs
self.entry = entry
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def readlines(self):
lines = self.entry[CONTENTS].split("\n")
out_lines = [line + "\n" for line in lines[:-1]]
if len(lines[-1]) > 0:
out_lines.append(lines[-1])
return out_lines
def read(self):
return self.entry[CONTENTS]
def write(self, contents):
if REF not in self.entry:
raise io.UnsupportedOperation("not writable")
self.entry[CONTENTS] += contents
class ReadOnlyFileContainer():
def __init__(self, contents=None):
self.next_id = 0
self.root = self._get_new_dir(None)
if contents:
self.batch_add(contents)
def batch_add(self, contents):
for path, data in contents.items():
if data is None:
self._add_dir(path)
elif type(data) == str:
self._add_file(path, data)
elif type(data) == tuple and len(data) == 1 and type(data[0]) == str:
self._add_link(path, data[0])
else:
raise FilesystemMockError("Invalid entry, must be None for directory, "
"str for file or tuple with 1 str element for link")
def get_root_node(self):
return self.root
@staticmethod
def _get_new_dir(parent):
return {PARENT: parent, TYPE: DIR, CONTENTS: dict()}
@staticmethod
def _get_new_file(parent, contents):
return {PARENT: parent, TYPE: FILE, CONTENTS: contents}
@staticmethod
def _get_new_link(parent, entry, target_path):
return {PARENT: parent, TYPE: LINK, CONTENTS: entry, TARGET: target_path}
def _do_add_dir(self, path_pieces):
def add_dir_rec(parent, pieces):
if len(pieces) == 0:
return parent
current = parent[CONTENTS].get(pieces[0], None)
if not current:
current = self._get_new_dir(parent)
parent[CONTENTS][pieces[0]] = current
return add_dir_rec(current, pieces[1:])
return add_dir_rec(self.root, path_pieces)
def _get_entry(self, path):
pieces = path.split("/")[1:]
def get_entry_rec(parent, pieces):
if len(pieces) == 0:
return parent
current = parent[CONTENTS].get(pieces[0], None)
if not current:
raise FilesystemMockError(f"Path not found: '{path}'")
return get_entry_rec(current, pieces[1:])
return get_entry_rec(self.root, pieces)
def _add_dir(self, path):
pieces = path.split("/")[1:]
self._do_add_dir(pieces)
def _add_file(self, path, contents):
pieces = path.split("/")[1:]
new_dir = self._do_add_dir(pieces[:-1])
file_entry = self._get_new_file(new_dir, contents)
new_dir[CONTENTS][pieces[-1]] = file_entry
def _add_link(self, path, ref_path):
pieces = path.split("/")[1:]
new_dir = self._do_add_dir(pieces[:-1])
ref_entry = self._get_entry(ref_path)
link_entry = self._get_new_link(new_dir, ref_entry, ref_path)
new_dir[CONTENTS][pieces[-1]] = link_entry
class FilesystemMock():
def __init__(self, contents: dict = None, fs: ReadOnlyFileContainer = None):
if fs is not None:
self.fs = fs
add_contents = True
else:
self.fs = ReadOnlyFileContainer(contents)
add_contents = False
self.root = self._get_new_entry(self.fs.get_root_node(), None)
if add_contents and contents:
self.batch_add(contents)
def batch_add(self, contents):
for path, data in contents.items():
if data is None:
self.create_directory(path)
elif type(data) == str:
self.set_file_contents(path, data)
elif type(data) == tuple and len(data) == 1 and type(data[0]) == str:
self.set_link_contents(path, data[0])
else:
raise FilesystemMockError("Invalid entry, must be None for directory, "
"str for file or tuple with 1 str element for link")
@staticmethod
def _get_new_entry(ref, parent, node_type=None):
if not node_type:
node_type = ref[TYPE]
entry = {REF: ref, PARENT: parent, TYPE: node_type}
if node_type == DIR:
entry[CONTENTS] = ref[CONTENTS].copy() if ref else dict()
elif node_type == LINK:
entry[CONTENTS] = ref[CONTENTS] if ref else None
entry[TARGET] = ref[TARGET] if ref else None
else:
entry[CONTENTS] = ''
return entry
def _get_entry(self, path, translate_link=False):
pieces = path.split("/")[1:]
def get_entry_rec(contents, pieces):
if len(pieces) == 0:
if translate_link and contents[TYPE] == LINK:
return contents[CONTENTS]
return contents
if contents[TYPE] == LINK:
contents = contents[CONTENTS]
if REF in contents and contents[CONTENTS] is None:
child = contents[REF][CONTENTS].get(pieces[0], None)
else:
child = contents[CONTENTS].get(pieces[0], None)
if child is None:
return None
return get_entry_rec(child, pieces[1:])
return get_entry_rec(self.root, pieces)
def _patch_entry(self, path, node_type):
pieces = path.split("/")[1:]
def translate_link(entry):
target = entry[CONTENTS]
if REF not in target:
target = self._patch_entry(entry[TARGET], target[TYPE])
entry[CONTENTS] = target
return target
def patch_entry_rec(level, entry, pieces):
if len(pieces) == 0:
if entry[TYPE] == LINK and node_type != LINK:
entry = translate_link(entry)
if entry[TYPE] != node_type:
if node_type == FILE:
raise IsADirectoryError(f"[Errno 21] Is a directory: '{path}'")
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'")
return entry
if entry[TYPE] == LINK:
entry = translate_link(entry)
if entry[TYPE] != DIR:
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'")
if entry[CONTENTS] is None:
entry[CONTENTS] = entry[REF][CONTENTS].copy()
child = entry[CONTENTS].get(pieces[0], None)
if child is None or REF not in child:
if child is None:
new_type = node_type if len(pieces) == 1 else DIR
child = self._get_new_entry(None, entry, new_type)
else:
child = self._get_new_entry(child, entry)
entry[CONTENTS][pieces[0]] = child
return patch_entry_rec(level + 1, child, pieces[1:])
return patch_entry_rec(0, self.root, pieces)
def exists(self, path):
entry = self._get_entry(path)
return entry is not None
def isfile(self, path):
entry = self._get_entry(path)
return entry and entry[TYPE] == FILE
def isdir(self, path):
entry = self._get_entry(path)
return entry and entry[TYPE] == DIR
def islink(self, path):
entry = self._get_entry(path)
return entry and entry[TYPE] == LINK
def open(self, path, mode="r"):
if "w" in mode:
entry = self._patch_entry(path, FILE)
else:
entry = self._get_entry(path, translate_link=True)
if not entry:
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'")
if entry[TYPE] == DIR:
raise IsADirectoryError(f"[Errno 21] Is a directory: '{path}'")
if "w" in mode:
self._call_listeners(entry)
return FileMock(self, entry)
def _call_listeners(self, entry):
if parent := entry[PARENT]:
self._call_listeners(parent)
if listeners := entry.get(LISTENERS, None):
for listener in listeners:
listener()
def create_directory(self, path):
entry = self._patch_entry(path, DIR)
self._call_listeners(entry)
def set_file_contents(self, path, contents):
entry = self._patch_entry(path, FILE)
entry[CONTENTS] = contents
self._call_listeners(entry)
def get_file_contents(self, path):
entry = self._get_entry(path, translate_link=True)
if entry is None:
raise FilesystemMockError("Path does not exist")
if entry[TYPE] != FILE:
raise FilesystemMockError("Path is not a file")
return entry[CONTENTS]
def set_link_contents(self, link_path, target_path):
target = self._get_entry(target_path)
if target is None:
raise FilesystemMockError("Target path does not exist")
entry = self._patch_entry(link_path, LINK)
entry[CONTENTS] = target
entry[TARGET] = target_path
self._call_listeners(entry)
def listdir(self, path):
entry = self._get_entry(path, translate_link=True)
if entry is None:
raise FilesystemMockError("Path does not exist")
if entry[TYPE] != DIR:
raise FilesystemMockError("Path is not a directory")
files = []
for name, child in entry[CONTENTS].items():
if child[TYPE] == FILE:
files.append(name)
files.sort()
return files
def add_listener(self, path, callback):
entry = self._get_entry(path, translate_link=True)
if entry is None:
raise FilesystemMockError("Path does not exist")
if REF not in entry:
entry = self._patch_entry(path, entry[TYPE])
listeners = entry.setdefault(LISTENERS, list())
listeners.append(callback)
def delete(self, path):
pieces = path.split("/")[1:]
entry = self._get_entry(path)
if entry is None:
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'")
pieces = path.split("/")
patched_entry = self._patch_entry("/".join(pieces[:-1]), DIR)
patched_entry[CONTENTS].pop(pieces[-1])
self._call_listeners(patched_entry)