Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2023-04-02 13:50:24 +0200
committerAndreas Baumann <mail@andreasbaumann.cc>2023-04-02 13:50:24 +0200
commitaf7ab9833c9f9944874f0162ae0975175ddc628d (patch)
treec486d4948b5994125f95c8aa1d61a059c1351127 /archinstall/lib
parent2caad35a885f9be30d5bf79a47f5456f276ae67b (diff)
parent6e3c6f8863041b54f6d8cf7af37f9719c493eadd (diff)
Merge branch 'upstreamMaster'
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/configuration.py5
-rw-r--r--archinstall/lib/disk/encryption.py26
-rw-r--r--archinstall/lib/disk/filesystem.py2
-rw-r--r--archinstall/lib/disk/helpers.py88
-rw-r--r--archinstall/lib/disk/mapperdev.py4
-rw-r--r--archinstall/lib/disk/partition.py133
-rw-r--r--archinstall/lib/disk/validators.py6
-rw-r--r--archinstall/lib/general.py50
-rw-r--r--archinstall/lib/hsm/fido.py21
-rw-r--r--archinstall/lib/installer.py237
-rw-r--r--archinstall/lib/luks.py2
-rw-r--r--archinstall/lib/menu/abstract_menu.py20
-rw-r--r--archinstall/lib/menu/global_menu.py14
-rw-r--r--archinstall/lib/models/disk_encryption.py59
-rw-r--r--archinstall/lib/packages/packages.py3
-rw-r--r--archinstall/lib/systemd.py2
-rw-r--r--archinstall/lib/user_interaction/general_conf.py3
-rw-r--r--archinstall/lib/user_interaction/partitioning_conf.py4
-rw-r--r--archinstall/lib/user_interaction/save_conf.py92
-rw-r--r--archinstall/lib/user_interaction/system_conf.py1
20 files changed, 517 insertions, 255 deletions
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py
index ad537b21..c036783f 100644
--- a/archinstall/lib/configuration.py
+++ b/archinstall/lib/configuration.py
@@ -44,7 +44,7 @@ class ConfigurationOutput:
self._disk_layout_file = "user_disk_layout.json"
self._sensitive = ['!users']
- self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run', 'disk_encryption']
+ self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run']
self._process_config()
@@ -71,6 +71,9 @@ class ConfigurationOutput:
else:
self._user_config[key] = self._config[key]
+ if key == 'disk_encryption' and self._config[key]: # special handling for encryption password
+ self._user_credentials['encryption_password'] = self._config[key].encryption_password
+
def user_config_to_json(self) -> str:
return json.dumps({
'config_version': storage['__version__'], # Tells us what version was used to generate the config
diff --git a/archinstall/lib/disk/encryption.py b/archinstall/lib/disk/encryption.py
index 67f656c8..c7496bfa 100644
--- a/archinstall/lib/disk/encryption.py
+++ b/archinstall/lib/disk/encryption.py
@@ -46,7 +46,7 @@ class DiskEncryptionMenu(AbstractSubMenu):
Selector(
_('Partitions'),
func=lambda preset: select_partitions_to_encrypt(self._disk_layouts, preset),
- display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None,
+ display_func=lambda x: f'{sum([len(y) for y in x.values()])} {_("Partitions")}' if x else None,
dependencies=['encryption_password'],
default=self._preset.partitions,
preview_func=self._prev_disk_layouts,
@@ -86,9 +86,14 @@ class DiskEncryptionMenu(AbstractSubMenu):
def _prev_disk_layouts(self) -> Optional[str]:
selector = self._menu_options['partitions']
if selector.has_selection():
- partitions: List[Any] = selector.current_selection
+ partitions: Dict[str, Any] = selector.current_selection
+
+ all_partitions = []
+ for parts in partitions.values():
+ all_partitions += parts
+
output = str(_('Partitions to be encrypted')) + '\n'
- output += current_partition_layout(partitions, with_title=False)
+ output += current_partition_layout(all_partitions, with_title=False)
return output.rstrip()
return None
@@ -132,7 +137,7 @@ def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]:
return None
-def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: List[Any]) -> List[Any]:
+def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: Dict[str, Any]) -> Dict[str, Any]:
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
all_partitions = []
@@ -153,10 +158,17 @@ def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: List[Any]
match choice.type_:
case MenuSelectionType.Reset:
- return []
+ return {}
case MenuSelectionType.Skip:
return preset
case MenuSelectionType.Selection:
- return choice.value # type: ignore
+ selections: List[Any] = choice.value # type: ignore
+ partitions = {}
+
+ for path, device in disk_layouts.items():
+ for part in selections:
+ if part in device.get('partitions', []):
+ partitions.setdefault(path, []).append(part)
- return []
+ return partitions
+ return {}
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index bdfa502a..1083df53 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -113,7 +113,7 @@ class Filesystem:
format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[])
disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')
- if partition in disk_encryption.partitions:
+ if disk_encryption and partition in disk_encryption.all_partitions:
if not partition['device_instance']:
raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index a5164b76..80d0cb53 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -212,6 +212,47 @@ def all_disks() -> List[BlockDevice]:
log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
return all_blockdevices(partitions=False, mappers=False)
+def get_blockdevice_info(device_path, exclude_iso_dev :bool = True) -> Dict[str, Any]:
+ for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
+ partprobe(device_path)
+ time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt))
+
+ try:
+ if exclude_iso_dev:
+ # exclude all devices associated with the iso boot locations
+ iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt']
+
+ try:
+ lsblk_info = get_lsblk_info(device_path)
+ except DiskError:
+ continue
+
+ if any([dev in lsblk_info.mountpoints for dev in iso_devs]):
+ continue
+
+ information = blkid(f'blkid -p -o export {device_path}')
+ return enrich_blockdevice_information(information)
+ except SysCallError as ex:
+ if ex.exit_code == 2:
+ # Assume that it's a loop device, and try to get info on it
+ try:
+ resolved_device_name = device_path.readlink().name
+ except OSError:
+ resolved_device_name = device_path.name
+
+ try:
+ information = get_loop_info(device_path)
+ if not information:
+ raise SysCallError(f"Could not get loop information for {resolved_device_name}", exit_code=1)
+ return enrich_blockdevice_information(information)
+
+ except SysCallError:
+ information = get_blockdevice_uevent(resolved_device_name)
+ return enrich_blockdevice_information(information)
+ else:
+ # We could not reliably get any information, perhaps the disk is clean of information?
+ if retry_attempt == storage['DISK_RETRY_ATTEMPTS'] - 1:
+ raise ex
def all_blockdevices(
mappers: bool = False,
@@ -230,40 +271,18 @@ def all_blockdevices(
# we'll iterate the /sys/class definitions and find the information
# from there.
for block_device in glob.glob("/sys/class/block/*"):
- device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
+ try:
+ device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
+ except FileNotFoundError:
+ log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
if device_path.exists() is False:
log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
continue
- try:
- if exclude_iso_dev:
- # exclude all devices associated with the iso boot locations
- iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt']
- lsblk_info = get_lsblk_info(device_path)
- if any([dev in lsblk_info.mountpoints for dev in iso_devs]):
- continue
-
- information = blkid(f'blkid -p -o export {device_path}')
- except SysCallError as ex:
- if ex.exit_code in (512, 2):
- # Assume that it's a loop device, and try to get info on it
- try:
- information = get_loop_info(device_path)
- if not information:
- print("Exit code for blkid -p -o export was:", ex.exit_code)
- raise SysCallError("Could not get loop information", exit_code=1)
-
- except SysCallError:
- print("Not a loop device, trying uevent rules.")
- information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
- else:
- # We could not reliably get any information, perhaps the disk is clean of information?
- print("Raising ex because:", ex.exit_code)
- raise ex
- # return instances
-
- information = enrich_blockdevice_information(information)
+ information = get_blockdevice_info(device_path)
+ if not information:
+ continue
for path, path_info in information.items():
if path_info.get('DMCRYPT_NAME'):
@@ -409,7 +428,6 @@ def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
return {}
output = json.loads(output)
- # print(output)
mounts = {}
@@ -421,11 +439,13 @@ def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
continue
if isinstance(blockdev, Partition):
- for blockdev_mountpoint in blockdev.mountpoints:
- block_devices_mountpoints[blockdev_mountpoint] = blockdev
+ if blockdev.mountpoints:
+ for blockdev_mountpoint in blockdev.mountpoints:
+ block_devices_mountpoints[blockdev_mountpoint] = blockdev
else:
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
+ if blockdev.mount_information:
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
index 71ef2a79..bf1b3583 100644
--- a/archinstall/lib/disk/mapperdev.py
+++ b/archinstall/lib/disk/mapperdev.py
@@ -25,6 +25,10 @@ class MapperDev:
return f"/dev/mapper/{self.mappername}"
@property
+ def part_uuid(self):
+ return self.partition.part_uuid
+
+ @property
def partition(self):
from .helpers import uevent, get_parent_of_partition
from .partition import Partition
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 9febf102..87eaa6a7 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -98,17 +98,18 @@ class Partition:
if mountpoint:
self.mount(mountpoint)
- self._partition_info = self._fetch_information()
-
- if not autodetect_filesystem and filesystem:
- self._partition_info.filesystem_type = filesystem
+ try:
+ self._partition_info = self._fetch_information()
+
+ if not autodetect_filesystem and filesystem:
+ self._partition_info.filesystem_type = filesystem
- if self._partition_info.filesystem_type == 'crypto_LUKS':
- self._encrypted = True
+ if self._partition_info.filesystem_type == 'crypto_LUKS':
+ self._encrypted = True
+ except DiskError:
+ self._partition_info = None
- # I hate doint this but I'm currently unsure where this
- # is acutally used to be able to fix the typing issues properly
- @typing.no_type_check
+ @typing.no_type_check # I hate doint this but I'm currently unsure where this is used.
def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
@@ -120,14 +121,17 @@ class Partition:
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
- if mountpoint := self._partition_info.get_first_mountpoint():
- mount_repr = f", mounted={mountpoint}"
- elif self._target_mountpoint:
- mount_repr = f", rel_mountpoint={self._target_mountpoint}"
+ if self._partition_info:
+ if mountpoint := self._partition_info.get_first_mountpoint():
+ mount_repr = f", mounted={mountpoint}"
+ elif self._target_mountpoint:
+ mount_repr = f", rel_mountpoint={self._target_mountpoint}"
classname = self.__class__.__name__
- if self._encrypted:
+ if not self._partition_info:
+ return f'{classname}(path={self._path})'
+ elif self._encrypted:
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
else:
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
@@ -146,7 +150,7 @@ class Partition:
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
- 'filesystem': self._partition_info.filesystem_type
+ 'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown'
}
return partition_info
@@ -164,34 +168,37 @@ class Partition:
'start': self.start,
'size': self.end,
'filesystem': {
- 'format': self._partition_info.filesystem_type
+ 'format': self._partition_info.filesystem_type if self._partition_info else 'None'
}
}
def _call_lsblk(self) -> Dict[str, Any]:
- self.partprobe()
- # This sleep might be overkill, but lsblk is known to
- # work against a chaotic cache that can change during call
- # causing no information to be returned (blkid is better)
- # time.sleep(1)
+ for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
+ self.partprobe()
+ time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk
+ # This sleep might be overkill, but lsblk is known to
+ # work against a chaotic cache that can change during call
+ # causing no information to be returned (blkid is better)
+ # time.sleep(1)
- # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
+ # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
- try:
- output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
- except SysCallError as error:
- # It appears as if lsblk can return exit codes like 8192 to indicate something.
- # But it does return output so we'll try to catch it.
- output = error.worker.decode('UTF-8')
-
- if output:
try:
- lsblk_info = json.loads(output)
- return lsblk_info
- except json.decoder.JSONDecodeError:
- log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR)
-
- raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk')
+ output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
+ except SysCallError as error:
+ # Get the output minus the message/info from lsblk if it returns a non-zero exit code.
+ output = error.worker.decode('UTF-8')
+ if '{' in output:
+ output = output[output.find('{'):]
+
+ if output:
+ try:
+ lsblk_info = json.loads(output)
+ return lsblk_info
+ except json.decoder.JSONDecodeError:
+ log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR)
+
+ raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk')
def _call_sfdisk(self) -> Dict[str, Any]:
output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
@@ -212,9 +219,12 @@ class Partition:
lsblk_info = self._call_lsblk()
sfdisk_info = self._call_sfdisk()
- if not (device := lsblk_info.get('blockdevices', [None])[0]):
+ if not (device := lsblk_info.get('blockdevices', [])):
raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
+ # Grab the first (and only) block device in the list as we're targeting a specific partition
+ device = device[0]
+
mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
@@ -243,7 +253,8 @@ class Partition:
@property
def filesystem(self) -> str:
- return self._partition_info.filesystem_type
+ if self._partition_info:
+ return self._partition_info.filesystem_type
@property
def mountpoint(self) -> Optional[Path]:
@@ -253,43 +264,51 @@ class Partition:
@property
def mountpoints(self) -> List[Path]:
- return self._partition_info.mountpoints
+ if self._partition_info:
+ return self._partition_info.mountpoints
@property
def sector_size(self) -> int:
- return self._partition_info.sector_size
+ if self._partition_info:
+ return self._partition_info.sector_size
@property
def start(self) -> Optional[int]:
- return self._partition_info.start
+ if self._partition_info:
+ return self._partition_info.start
@property
def end(self) -> Optional[int]:
- return self._partition_info.end
+ if self._partition_info:
+ return self._partition_info.end
@property
def end_sectors(self) -> Optional[int]:
- start = self._partition_info.start
- end = self._partition_info.end
- if start and end:
- return start + end
- return None
+ if self._partition_info:
+ start = self._partition_info.start
+ end = self._partition_info.end
+ if start and end:
+ return start + end
@property
def size(self) -> Optional[float]:
- return self._partition_info.size
+ if self._partition_info:
+ return self._partition_info.size
@property
def boot(self) -> bool:
- return self._partition_info.bootable
+ if self._partition_info:
+ return self._partition_info.bootable
@property
def partition_type(self) -> Optional[str]:
- return self._partition_info.pttype
+ if self._partition_info:
+ return self._partition_info.pttype
@property
def part_uuid(self) -> str:
- return self._partition_info.partuuid
+ if self._partition_info:
+ return self._partition_info.partuuid
@property
def uuid(self) -> Optional[str]:
@@ -355,7 +374,8 @@ class Partition:
log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
- return self._partition_info.uuid
+ if self._partition_info:
+ return self._partition_info.uuid
@property
def encrypted(self) -> Union[bool, None]:
@@ -611,14 +631,7 @@ class Partition:
return False
def unmount(self) -> bool:
- worker = SysCommand(f"/usr/bin/umount {self._path}")
- exit_code = worker.exit_code
-
- # Without to much research, it seams that low error codes are errors.
- # And above 8k is indicators such as "/dev/x not mounted.".
- # So anything in between 0 and 8k are errors (?).
- if exit_code and 0 < exit_code < 8000:
- raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
+ SysCommand(f"/usr/bin/umount {self._path}")
# Update the partition info since the mount info has changed after this call.
self._partition_info = self._fetch_information()
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
index 54808886..076a8ba2 100644
--- a/archinstall/lib/disk/validators.py
+++ b/archinstall/lib/disk/validators.py
@@ -7,10 +7,12 @@ def valid_parted_position(pos :str) -> bool:
if pos.isdigit():
return True
- if pos.lower().endswith('b') and pos[:-1].isdigit():
+ pos_lower = pos.lower()
+
+ if (pos_lower.endswith('b') or pos_lower.endswith('s')) and pos[:-1].isdigit():
return True
- if any(pos.lower().endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit()
+ if any(pos_lower.endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit()
for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']):
return True
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 8c7aed91..79ab024b 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -130,7 +130,10 @@ class JsonEncoder:
copy[JsonEncoder._encode(key)] = val
return copy
elif hasattr(obj, 'json'):
- return obj.json()
+ # json() is a friendly name for json-helper, it should return
+ # a dictionary representation of the object so that it can be
+ # processed by the json library.
+ return json.loads(json.dumps(obj.json(), cls=JSON))
elif hasattr(obj, '__dump__'):
return obj.__dump__()
elif isinstance(obj, (datetime, date)):
@@ -185,12 +188,16 @@ class SysCommandWorker:
def __init__(self,
cmd :Union[str, List[str]],
callbacks :Optional[Dict[str, Any]] = None,
+ peek_output :Optional[bool] = False,
peak_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None,
logfile :Optional[None] = None,
working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True):
+ if peak_output:
+ log("SysCommandWorker()'s peak_output is deprecated, use peek_output instead.", level=logging.WARNING, fg='red')
+
if not callbacks:
callbacks = {}
if not environment_vars:
@@ -208,7 +215,9 @@ class SysCommandWorker:
self.cmd = cmd
self.callbacks = callbacks
- self.peak_output = peak_output
+ self.peek_output = peek_output
+ if not self.peek_output and peak_output:
+ self.peek_output = peak_output
# define the standard locale for command outputs. For now the C ascii one. Can be overridden
self.environment_vars = {**storage.get('CMD_LOCALE',{}),**environment_vars}
self.logfile = logfile
@@ -262,7 +271,7 @@ class SysCommandWorker:
except:
pass
- if self.peak_output:
+ if self.peek_output:
# To make sure any peaked output didn't leave us hanging
# on the same line we were on.
sys.stdout.write("\n")
@@ -307,7 +316,7 @@ class SysCommandWorker:
self._trace_log_pos = min(max(0, pos), len(self._trace_log))
def peak(self, output: Union[str, bytes]) -> bool:
- if self.peak_output:
+ if self.peek_output:
if type(output) == bytes:
try:
output = output.decode('UTF-8')
@@ -320,8 +329,8 @@ class SysCommandWorker:
if peak_logfile.exists() is False:
change_perm = True
- with peak_logfile.open("a") as peak_output_log:
- peak_output_log.write(output)
+ with peak_logfile.open("a") as peek_output_log:
+ peek_output_log.write(output)
if change_perm:
os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
@@ -349,10 +358,12 @@ class SysCommandWorker:
if self.ended or (got_output is False and pid_exists(self.pid) is False):
self.ended = time.time()
try:
- self.exit_code = os.waitpid(self.pid, 0)[1]
+ wait_status = os.waitpid(self.pid, 0)[1]
+ self.exit_code = os.waitstatus_to_exitcode(wait_status)
except ChildProcessError:
try:
- self.exit_code = os.waitpid(self.child_fd, 0)[1]
+ wait_status = os.waitpid(self.child_fd, 0)[1]
+ self.exit_code = os.waitstatus_to_exitcode(wait_status)
except ChildProcessError:
self.exit_code = 1
@@ -385,6 +396,13 @@ class SysCommandWorker:
os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
except PermissionError:
pass
+ # If history_logfile does not exist, ignore the error
+ except FileNotFoundError:
+ pass
+ except Exception as e:
+ exception_type = type(e).__name__
+ log(f"Unexpected {exception_type} occurred in {self.cmd}: {e}", level=logging.ERROR)
+ raise e
os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars})
if storage['arguments'].get('debug'):
@@ -412,11 +430,15 @@ class SysCommand:
cmd :Union[str, List[str]],
callbacks :Optional[Dict[str, Callable[[Any], Any]]] = None,
start_callback :Optional[Callable[[Any], Any]] = None,
+ peek_output :Optional[bool] = False,
peak_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None,
working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True):
+ if peak_output:
+ log("SysCommandWorker()'s peak_output is deprecated, use peek_output instead.", level=logging.WARNING, fg='red')
+
_callbacks = {}
if callbacks:
for hook, func in callbacks.items():
@@ -426,7 +448,9 @@ class SysCommand:
self.cmd = cmd
self._callbacks = _callbacks
- self.peak_output = peak_output
+ self.peek_output = peek_output
+ if not self.peek_output and peak_output:
+ self.peek_output = peak_output
self.environment_vars = environment_vars
self.working_directory = working_directory
self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines
@@ -469,7 +493,7 @@ class SysCommand:
return {
'cmd': self.cmd,
'callbacks': self._callbacks,
- 'peak': self.peak_output,
+ 'peak': self.peek_output,
'environment_vars': self.environment_vars,
'session': True if self.session else False
}
@@ -478,7 +502,7 @@ class SysCommand:
"""
Initiates a :ref:`SysCommandWorker` session in this class ``.session``.
It then proceeds to poll the process until it ends, after which it also
- clears any printed output if ``.peak_output=True``.
+ clears any printed output if ``.peek_output=True``.
"""
if self.session:
return self.session
@@ -486,7 +510,7 @@ class SysCommand:
with SysCommandWorker(
self.cmd,
callbacks=self._callbacks,
- peak_output=self.peak_output,
+ peek_output=self.peek_output,
environment_vars=self.environment_vars,
remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines,
working_directory=self.working_directory) as session:
@@ -497,7 +521,7 @@ class SysCommand:
while self.session.ended is None:
self.session.poll()
- if self.peak_output:
+ if self.peek_output:
sys.stdout.write('\n')
sys.stdout.flush()
diff --git a/archinstall/lib/hsm/fido.py b/archinstall/lib/hsm/fido.py
index 4cd956a3..1c226322 100644
--- a/archinstall/lib/hsm/fido.py
+++ b/archinstall/lib/hsm/fido.py
@@ -1,9 +1,11 @@
+from __future__ import annotations
+
import getpass
import logging
from dataclasses import dataclass
from pathlib import Path
-from typing import List
+from typing import List, Dict
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
from ..disk.partition import Partition
@@ -16,6 +18,21 @@ class Fido2Device:
manufacturer: str
product: str
+ def json(self) -> Dict[str, str]:
+ return {
+ 'path': str(self.path),
+ 'manufacturer': self.manufacturer,
+ 'product': self.product
+ }
+
+ @classmethod
+ def parse_arg(cls, arg: Dict[str, str]) -> 'Fido2Device':
+ return Fido2Device(
+ Path(arg['path']),
+ arg['manufacturer'],
+ arg['product']
+ )
+
class Fido2:
_loaded: bool = False
@@ -76,7 +93,7 @@ class Fido2:
@classmethod
def fido2_enroll(cls, hsm_device: Fido2Device, partition :Partition, password :str):
- worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {partition.real_device}", peak_output=True)
+ worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {partition.real_device}", peek_output=True)
pw_inputted = False
pin_inputted = False
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 1926f593..f1c7b3db 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -132,6 +132,7 @@ class Installer:
# if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override.
self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"]
self.KERNEL_PARAMS = []
+ self.FSTAB_ENTRIES = []
self._zram_enabled = False
@@ -198,7 +199,7 @@ class Installer:
def _create_keyfile(self,luks_handle , partition :dict, password :str):
""" roiutine to create keyfiles, so it can be moved elsewhere
"""
- if self._disk_encryption.generate_encryption_file(partition):
+ if self._disk_encryption and self._disk_encryption.generate_encryption_file(partition):
if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
cryptkey_dir.mkdir(parents=True)
# Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key
@@ -246,20 +247,21 @@ class Installer:
mount_queue = {}
# we manage the encrypted partititons
- for partition in self._disk_encryption.partitions:
- # open the luks device and all associate stuff
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
-
- # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
- with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device:
- if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition):
- list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password])
- # this way all the requesrs will be to the dm_crypt device and not to the physical partition
- partition['device_instance'] = unlocked_device
-
- if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False:
- if self._disk_encryption.hsm_device:
- Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password)
+ if self._disk_encryption:
+ for partition in self._disk_encryption.all_partitions:
+ # open the luks device and all associate stuff
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
+
+ # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
+ with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device:
+ if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition):
+ list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password])
+ # this way all the requesrs will be to the dm_crypt device and not to the physical partition
+ partition['device_instance'] = unlocked_device
+
+ if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False:
+ if self._disk_encryption.hsm_device:
+ Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password)
btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])]
@@ -292,7 +294,7 @@ class Installer:
else:
mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target)
- log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.INFO, fg="white")
+ log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.DEBUG, fg="white")
# We mount everything by sorting on the mountpoint itself.
for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]):
@@ -317,6 +319,26 @@ class Installer:
partition.mount(f'{self.target}{mountpoint}', options=options)
+ def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'):
+ if file[:1] != '/':
+ file = f"/{file}"
+ if len(file.strip()) <= 0 or file == '/':
+ raise ValueError(f"The filename for the swap file has to be a valid path, not: {self.target}{file}")
+
+ SysCommand(f'dd if=/dev/zero of={self.target}{file} bs={size} count=1')
+ SysCommand(f'chmod 0600 {self.target}{file}')
+ SysCommand(f'mkswap {self.target}{file}')
+
+ self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0')
+
+ if enable_resume:
+ resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip()
+ resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip()
+
+ self.HOOKS.append('resume')
+ self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}')
+ self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}')
+
def post_install_check(self, *args :str, **kwargs :str) -> List[str]:
return [step for step, flag in self.helper_flags.items() if flag is False]
@@ -396,7 +418,8 @@ class Installer:
raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red")
try:
- return SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf {self.target} {" ".join(packages)} --noconfirm', peak_output=True).exit_code == 0
+ SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True)
+ return True
except SysCallError as error:
self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red")
@@ -417,8 +440,10 @@ class Installer:
def genfstab(self, flags :str = '-pU') -> bool:
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
- if not (fstab := SysCommand(f'/usr/bin/genfstab {flags} {self.target}')).exit_code == 0:
- raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}')
+ try:
+ fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}')
+ except SysCallError as error:
+ raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}')
with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
fstab_fh.write(fstab.decode())
@@ -431,6 +456,10 @@ class Installer:
if plugin.on_genfstab(self) is True:
break
+ with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
+ for entry in self.FSTAB_ENTRIES:
+ fstab_fh.write(f'{entry}\n')
+
return True
def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
@@ -463,7 +492,11 @@ class Installer:
with open(f'{self.target}/etc/locale.conf', 'w') as fh:
fh.write(f'LANG={locale}.{encoding}{modifier}\n')
- return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
+ try:
+ SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen')
+ return True
+ except SysCallError:
+ return False
def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool:
if not zone:
@@ -519,8 +552,10 @@ class Installer:
def enable_service(self, *services :str) -> None:
for service in services:
self.log(f'Enabling service {service}', level=logging.INFO)
- if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
- raise ServiceException(f"Unable to start service {service}: {output}")
+ try:
+ self.arch_chroot(f'systemctl enable {service}')
+ except SysCallError as error:
+ raise ServiceException(f"Unable to start service {service}: {error}")
for plugin in plugins.values():
if hasattr(plugin, 'on_service'):
@@ -641,12 +676,17 @@ class Installer:
if plugin.on_mkinitcpio(self):
return True
+ # mkinitcpio will error out if there's no vconsole.
+ if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False:
+ with vconsole.open('w') as fh:
+ fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n")
+
with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n")
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
- if not self._disk_encryption.hsm_device:
+ if self._disk_encryption and not self._disk_encryption.hsm_device:
# For now, if we don't use HSM we revert to the old
# way of setting up encryption hooks for mkinitcpio.
# This is purely for stability reasons, we're going away from this.
@@ -656,7 +696,11 @@ class Installer:
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
- return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0
+ try:
+ SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
+ return True
+ except SysCallError:
+ return False
def minimal_installation(
self, testing: bool = False, multilib: bool = False,
@@ -690,7 +734,7 @@ class Installer:
self.HOOKS.remove('fsck')
if self.detect_encryption(partition):
- if self._disk_encryption.hsm_device:
+ if self._disk_encryption and self._disk_encryption.hsm_device:
# Required bby mkinitcpio to add support for fido2-device options
self.pacstrap('libfido2')
@@ -754,14 +798,6 @@ class Installer:
# TODO: Use python functions for this
SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
- if self._disk_encryption.hsm_device:
- # TODO:
- # A bit of a hack, but we need to get vconsole.conf in there
- # before running `mkinitcpio` because it expects it in HSM mode.
- if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False:
- with vconsole.open('w') as fh:
- fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n")
-
self.mkinitcpio('-P')
self.helper_flags['base'] = True
@@ -841,60 +877,68 @@ class Installer:
os.makedirs(f'{self.target}/boot/loader/entries')
for kernel in self.kernels:
- # Setup the loader entry
- with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}.conf', 'w') as entry:
- entry.write('# Created by: archinstall\n')
- entry.write(f'# Created on: {self.init_time}\n')
- entry.write(f'title Arch Linux ({kernel})\n')
- entry.write(f"linux /vmlinuz-{kernel}\n")
- if not is_vm():
- vendor = cpu_vendor()
- if vendor == "AuthenticAMD":
- entry.write("initrd /amd-ucode.img\n")
- elif vendor == "GenuineIntel":
- entry.write("initrd /intel-ucode.img\n")
+ for variant in ("", "-fallback"):
+ # Setup the loader entry
+ with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}{variant}.conf', 'w') as entry:
+ entry.write('# Created by: archinstall\n')
+ entry.write(f'# Created on: {self.init_time}\n')
+ entry.write(f'title Arch Linux ({kernel}{variant})\n')
+ entry.write(f"linux /vmlinuz-{kernel}\n")
+ if not is_vm():
+ vendor = cpu_vendor()
+ if vendor == "AuthenticAMD":
+ entry.write("initrd /amd-ucode.img\n")
+ elif vendor == "GenuineIntel":
+ entry.write("initrd /intel-ucode.img\n")
+ else:
+ self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG)
+ entry.write(f"initrd /initramfs-{kernel}{variant}.img\n")
+ # blkid doesn't trigger on loopback devices really well,
+ # so we'll use the old manual method until we get that sorted out.
+ root_fs_type = get_mount_fs_type(root_partition.filesystem)
+
+ if root_fs_type is not None:
+ options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n'
else:
- self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG)
- entry.write(f"initrd /initramfs-{kernel}.img\n")
- # blkid doesn't trigger on loopback devices really well,
- # so we'll use the old manual method until we get that sorted out.
- root_fs_type = get_mount_fs_type(root_partition.filesystem)
-
- if root_fs_type is not None:
- options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n'
- else:
- options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n'
+ options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n'
- for subvolume in root_partition.subvolumes:
- if subvolume.root is True and subvolume.name != '<FS_TREE>':
- options_entry = f"rootflags=subvol={subvolume.name} " + options_entry
+ for subvolume in root_partition.subvolumes:
+ if subvolume.root is True and subvolume.name != '<FS_TREE>':
+ options_entry = f"rootflags=subvol={subvolume.name} " + options_entry
- # Zswap should be disabled when using zram.
- #
- # https://github.com/archlinux/archinstall/issues/881
- if self._zram_enabled:
- options_entry = "zswap.enabled=0 " + options_entry
+ # Zswap should be disabled when using zram.
+ #
+ # https://github.com/archlinux/archinstall/issues/881
+ if self._zram_enabled:
+ options_entry = "zswap.enabled=0 " + options_entry
- if real_device := self.detect_encryption(root_partition):
- # TODO: We need to detect if the encrypted device is a whole disk encryption,
- # or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG)
+ if real_device := self.detect_encryption(root_partition):
+ # TODO: We need to detect if the encrypted device is a whole disk encryption,
+ # or simply a partition encryption. Right now we assume it's a partition (and we always have)
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG)
- kernel_options = f"options"
+ kernel_options = f"options"
- if self._disk_encryption.hsm_device:
+ if self._disk_encryption.hsm_device:
+ # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work
+ kernel_options += f" rd.luks.name={real_device.uuid}=luksdev"
+ # Note: tpm2-device and fido2-device don't play along very well:
+ # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645
+ kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no"
+ else:
+ kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev"
+
+ entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}')
+
+ if self._disk_encryption and self._disk_encryption.hsm_device:
# Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work
kernel_options += f" rd.luks.name={real_device.uuid}=luksdev"
# Note: tpm2-device and fido2-device don't play along very well:
# https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645
kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no"
else:
- kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev"
-
- entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}')
- else:
- log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
- entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}')
+ log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
+ entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}')
self.helper_flags['bootloader'] = "systemd"
@@ -923,15 +967,15 @@ class Installer:
if has_uefi():
self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peak_output=True)
+ SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
except SysCallError:
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peak_output=True)
+ SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
except SysCallError as error:
raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}")
else:
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peak_output=True)
+ SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True)
except SysCallError as error:
raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}")
@@ -1109,8 +1153,10 @@ class Installer:
if not handled_by_plugin:
self.log(f'Creating user {user}', level=logging.INFO)
- if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0:
- raise SystemError(f"Could not create user inside installation: {output}")
+ try:
+ SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')
+ except SysCallError as error:
+ raise SystemError(f"Could not create user inside installation: {error}")
for plugin in plugins.values():
if hasattr(plugin, 'on_user_created'):
@@ -1138,17 +1184,28 @@ class Installer:
echo = shlex.join(['echo', combo])
sh = shlex.join(['sh', '-c', echo])
- result = SysCommand(f"/usr/bin/arch-chroot {self.target} " + sh[:-1] + " | chpasswd'")
- return result.exit_code == 0
+ try:
+ SysCommand(f"/usr/bin/arch-chroot {self.target} " + sh[:-1] + " | chpasswd'")
+ return True
+ except SysCallError:
+ return False
def user_set_shell(self, user :str, shell :str) -> bool:
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
- return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0
+ try:
+ SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")
+ return True
+ except SysCallError:
+ return False
def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
cleaned_path = path.replace('\'', '\\\'')
- return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'").exit_code == 0
+ try:
+ SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'")
+ return True
+ except SysCallError:
+ return False
def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
return InstallationFile(self, filename, owner)
@@ -1166,8 +1223,10 @@ class Installer:
with Boot(self) as session:
os.system('/usr/bin/systemd-run --machine=archinstall --pty localectl set-keymap ""')
- if (output := session.SysCommand(["localectl", "set-keymap", language])).exit_code != 0:
- raise ServiceException(f"Unable to set locale '{language}' for console: {output}")
+ try:
+ session.SysCommand(["localectl", "set-keymap", language])
+ except SysCallError as error:
+ raise ServiceException(f"Unable to set locale '{language}' for console: {error}")
self.log(f"Keyboard language for this installation is now set to: {language}")
else:
@@ -1190,8 +1249,10 @@ class Installer:
with Boot(self) as session:
session.SysCommand(["localectl", "set-x11-keymap", '""'])
- if (output := session.SysCommand(["localectl", "set-x11-keymap", language])).exit_code != 0:
- raise ServiceException(f"Unable to set locale '{language}' for X11: {output}")
+ try:
+ session.SysCommand(["localectl", "set-x11-keymap", language])
+ except SysCallError as error:
+ raise ServiceException(f"Unable to set locale '{language}' for X11: {error}")
else:
self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 7e4534d8..ad6bf093 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -115,7 +115,7 @@ class luks2:
if cmd_handle.exit_code != 0:
raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}')
except SysCallError as err:
- if err.exit_code == 256:
+ if err.exit_code == 1:
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
# Partition was in use, unmount it and try again
partition.unmount()
diff --git a/archinstall/lib/menu/abstract_menu.py b/archinstall/lib/menu/abstract_menu.py
index 5a7ca03a..d659d709 100644
--- a/archinstall/lib/menu/abstract_menu.py
+++ b/archinstall/lib/menu/abstract_menu.py
@@ -17,14 +17,14 @@ class Selector:
def __init__(
self,
description :str,
- func :Callable = None,
- display_func :Callable = None,
+ func :Optional[Callable] = None,
+ display_func :Optional[Callable] = None,
default :Any = None,
enabled :bool = False,
dependencies :List = [],
dependencies_not :List = [],
- exec_func :Callable = None,
- preview_func :Callable = None,
+ exec_func :Optional[Callable] = None,
+ preview_func :Optional[Callable] = None,
mandatory :bool = False,
no_store :bool = False
):
@@ -165,7 +165,7 @@ class Selector:
class AbstractMenu:
- def __init__(self, data_store: Dict[str, Any] = None, auto_cursor=False, preview_size :float = 0.2):
+ def __init__(self, data_store: Optional[Dict[str, Any]] = None, auto_cursor=False, preview_size :float = 0.2):
"""
Create a new selection menu.
@@ -226,7 +226,7 @@ class AbstractMenu:
""" will be called before each action in the menu """
return
- def post_callback(self, selection_name: str = None, value: Any = None):
+ def post_callback(self, selection_name: Optional[str] = None, value: Any = None):
""" will be called after each action in the menu """
return True
@@ -356,7 +356,7 @@ class AbstractMenu:
config_name, selector = self._find_selection(selection_name)
return self.exec_option(config_name, selector)
- def exec_option(self, config_name :str, p_selector :Selector = None) -> bool:
+ def exec_option(self, config_name :str, p_selector :Optional[Selector] = None) -> bool:
""" processes the execution of a given menu entry
- pre process callback
- selection function
@@ -372,13 +372,13 @@ class AbstractMenu:
self.pre_callback(config_name)
result = None
- if selector.func:
+ if selector.func is not None:
presel_val = self.option(config_name).get_selection()
result = selector.func(presel_val)
self._menu_options[config_name].set_current_selection(result)
if selector.do_store():
self._data_store[config_name] = result
- exec_ret_val = selector.exec_func(config_name,result) if selector.exec_func else False
+ exec_ret_val = selector.exec_func(config_name,result) if selector.exec_func is not None else False
self.post_callback(config_name,result)
if exec_ret_val and self._check_mandatory_status():
@@ -478,7 +478,7 @@ class AbstractMenu:
class AbstractSubMenu(AbstractMenu):
- def __init__(self, data_store: Dict[str, Any] = None):
+ def __init__(self, data_store: Optional[Dict[str, Any]] = None):
super().__init__(data_store=data_store)
self._menu_options['__separator__'] = Selector('')
diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py
index 0d348227..7c5b153e 100644
--- a/archinstall/lib/menu/global_menu.py
+++ b/archinstall/lib/menu/global_menu.py
@@ -197,11 +197,11 @@ class GlobalMenu(AbstractMenu):
self._menu_options['abort'] = Selector(_('Abort'), exec_func=lambda n,v:exit(1))
- def _update_install_text(self, name :str = None, result :Any = None):
+ def _update_install_text(self, name :Optional[str] = None, result :Any = None):
text = self._install_text()
self._menu_options['install'].update_description(text)
- def post_callback(self,name :str = None ,result :Any = None):
+ def post_callback(self,name :Optional[str] = None ,result :Any = None):
self._update_install_text(name, result)
def _install_text(self):
@@ -279,8 +279,8 @@ class GlobalMenu(AbstractMenu):
output = str(_('Encryption type')) + f': {enc_type}\n'
output += str(_('Password')) + f': {secret(encryption.encryption_password)}\n'
- if encryption.partitions:
- output += 'Partitions: {} selected'.format(len(encryption.partitions)) + '\n'
+ if encryption.all_partitions:
+ output += 'Partitions: {} selected'.format(len(encryption.all_partitions)) + '\n'
if encryption.hsm_device:
output += f'HSM: {encryption.hsm_device.manufacturer}'
@@ -377,9 +377,9 @@ class GlobalMenu(AbstractMenu):
return harddrives
- def _select_profile(self, preset):
+ def _select_profile(self, preset) -> Optional[Profile]:
+ ret: Optional[Profile] = None
profile = select_profile(preset)
- ret = None
if profile is None:
if any([
@@ -403,7 +403,7 @@ class GlobalMenu(AbstractMenu):
namespace = f'{profile.namespace}.py'
with profile.load_instructions(namespace=namespace) as imported:
if imported._prep_function(servers=servers, desktop=desktop, desktop_env=desktop_env, gfx_driver=gfx_driver):
- ret: Profile = profile
+ ret = profile
match ret.name:
case 'minimal':
diff --git a/archinstall/lib/models/disk_encryption.py b/archinstall/lib/models/disk_encryption.py
index 3edab93e..a4a501d9 100644
--- a/archinstall/lib/models/disk_encryption.py
+++ b/archinstall/lib/models/disk_encryption.py
@@ -1,5 +1,7 @@
+from __future__ import annotations
+
from dataclasses import dataclass, field
-from enum import Enum, auto
+from enum import Enum
from typing import Optional, List, Dict, TYPE_CHECKING, Any
from ..hsm.fido import Fido2Device
@@ -9,8 +11,7 @@ if TYPE_CHECKING:
class EncryptionType(Enum):
- Partition = auto()
- # FullDiskEncryption = auto()
+ Partition = 'partition'
@classmethod
def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']:
@@ -35,9 +36,55 @@ class EncryptionType(Enum):
class DiskEncryption:
encryption_type: EncryptionType = EncryptionType.Partition
encryption_password: str = ''
- partitions: List[str] = field(default_factory=list)
+ partitions: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict)
hsm_device: Optional[Fido2Device] = None
+ @property
+ def all_partitions(self) -> List[Dict[str, Any]]:
+ _all: List[Dict[str, Any]] = []
+ for parts in self.partitions.values():
+ _all += parts
+ return _all
+
def generate_encryption_file(self, partition) -> bool:
- return partition in self.partitions and partition['mountpoint'] != '/'
-
+ return partition in self.all_partitions and partition['mountpoint'] != '/'
+
+ def json(self) -> Dict[str, Any]:
+ obj = {
+ 'encryption_type': self.encryption_type.value,
+ 'partitions': self.partitions
+ }
+
+ if self.hsm_device:
+ obj['hsm_device'] = self.hsm_device.json()
+
+ return obj
+
+ @classmethod
+ def parse_arg(
+ cls,
+ disk_layout: Dict[str, Any],
+ arg: Dict[str, Any],
+ password: str = ''
+ ) -> 'DiskEncryption':
+ # we have to map the enc partition config to the disk layout objects
+ # they both need to point to the same object as it will get modified
+ # during the installation process
+ enc_partitions: Dict[str, List[Dict[str, Any]]] = {}
+
+ for path, partitions in disk_layout.items():
+ conf_partitions = arg['partitions'].get(path, [])
+ for part in partitions['partitions']:
+ if part in conf_partitions:
+ enc_partitions.setdefault(path, []).append(part)
+
+ enc = DiskEncryption(
+ EncryptionType(arg['encryption_type']),
+ password,
+ enc_partitions
+ )
+
+ if hsm := arg.get('hsm_device', None):
+ enc.hsm_device = Fido2Device.parse_arg(hsm)
+
+ return enc
diff --git a/archinstall/lib/packages/packages.py b/archinstall/lib/packages/packages.py
index d5834f43..0743e83b 100644
--- a/archinstall/lib/packages/packages.py
+++ b/archinstall/lib/packages/packages.py
@@ -1,3 +1,4 @@
+import dataclasses
import json
import ssl
from typing import Dict, Any, Tuple, List
@@ -112,4 +113,4 @@ def installed_package(package :str) -> LocalPackage:
except SysCallError:
pass
- return LocalPackage(**package_info)
+ return LocalPackage({field.name: package_info.get(field.name) for field in dataclasses.fields(LocalPackage)})
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py
index f459f94b..64ffcae4 100644
--- a/archinstall/lib/systemd.py
+++ b/archinstall/lib/systemd.py
@@ -97,8 +97,6 @@ class Boot:
shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now')
except SysCallError as error:
shutdown_exit_code = error.exit_code
- # if error.exit_code == 256:
- # pass
while self.session.is_alive():
time.sleep(0.25)
diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py
index 76631a98..fc7ded45 100644
--- a/archinstall/lib/user_interaction/general_conf.py
+++ b/archinstall/lib/user_interaction/general_conf.py
@@ -174,7 +174,10 @@ def select_profile(preset) -> Optional[Profile]:
storage['profile_minimal'] = False
storage['_selected_servers'] = []
storage['_desktop_profile'] = None
+ storage['sway_sys_priv_ctrl'] = None
+ storage['arguments']['sway_sys_priv_ctrl'] = None
storage['arguments']['desktop-environment'] = None
+ storage['arguments']['gfx_driver'] = None
storage['arguments']['gfx_driver_packages'] = None
return None
case MenuSelectionType.Skip:
diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py
index cff76dc2..0a5ede51 100644
--- a/archinstall/lib/user_interaction/partitioning_conf.py
+++ b/archinstall/lib/user_interaction/partitioning_conf.py
@@ -208,7 +208,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if fs_choice.type_ == MenuSelectionType.Skip:
continue
- prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format(
+ prompt = str(_('Enter the start location (in parted units: s, GB, %, etc. ; default: {}): ')).format(
block_device.first_free_sector
)
start = input(prompt).strip()
@@ -219,7 +219,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
else:
end_suggested = '100%'
- prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format(
+ prompt = str(_('Enter the end location (in parted units: s, GB, %, etc. ; ex: {}): ')).format(
end_suggested
)
end = input(prompt).strip()
diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py
index d60ef995..5b4ae2b3 100644
--- a/archinstall/lib/user_interaction/save_conf.py
+++ b/archinstall/lib/user_interaction/save_conf.py
@@ -1,9 +1,12 @@
from __future__ import annotations
+import logging
+
from pathlib import Path
from typing import Any, Dict, TYPE_CHECKING
from ..configuration import ConfigurationOutput
+from ..general import SysCommand
from ..menu import Menu
from ..menu.menu import MenuSelectionType
from ..output import log
@@ -58,20 +61,75 @@ def save_config(config: Dict):
if choice.type_ == MenuSelectionType.Skip:
return
- while True:
- path = input(_('Enter a directory for the configuration(s) to be saved: ')).strip(' ')
- dest_path = Path(path)
- if dest_path.exists() and dest_path.is_dir():
- break
- log(_('Not a valid directory: {}').format(dest_path), fg='red')
-
- if options['user_config'] == choice.value:
- config_output.save_user_config(dest_path)
- elif options['user_creds'] == choice.value:
- config_output.save_user_creds(dest_path)
- elif options['disk_layout'] == choice.value:
- config_output.save_disk_layout(dest_path)
- elif options['all'] == choice.value:
- config_output.save_user_config(dest_path)
- config_output.save_user_creds(dest_path)
- config_output.save_disk_layout(dest_path)
+ dirs_to_exclude = [
+ '/bin',
+ '/dev',
+ '/lib',
+ '/lib64',
+ '/lost+found',
+ '/opt',
+ '/proc',
+ '/run',
+ '/sbin',
+ '/srv',
+ '/sys',
+ '/usr',
+ '/var',
+ ]
+ log(
+ _('When picking a directory to save configuration files to,'
+ ' by default we will ignore the following folders: ') + ','.join(dirs_to_exclude),
+ level=logging.DEBUG
+ )
+
+ log(_('Finding possible directories to save configuration files ...'), level=logging.INFO)
+
+ find_exclude = '-path ' + ' -prune -o -path '.join(dirs_to_exclude) + ' -prune '
+ file_picker_command = f'find / {find_exclude} -o -type d -print0'
+ possible_save_dirs = list(
+ filter(None, SysCommand(file_picker_command).decode().split('\x00'))
+ )
+
+ selection = Menu(
+ _('Select directory (or directories) for saving configuration files'),
+ possible_save_dirs,
+ multi=True,
+ skip=True,
+ allow_reset=False,
+ ).run()
+
+ match selection.type_:
+ case MenuSelectionType.Skip:
+ return
+ case _:
+ save_dirs = selection.value
+
+ prompt = _('Do you want to save {} configuration file(s) in the following locations?\n\n{}').format(
+ list(options.keys())[list(options.values()).index(choice.value)],
+ save_dirs
+ )
+ save_confirmation = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run()
+ if save_confirmation == Menu.no():
+ return
+
+ log(
+ _('Saving {} configuration files to {}').format(
+ list(options.keys())[list(options.values()).index(choice.value)],
+ save_dirs
+ ),
+ level=logging.DEBUG
+ )
+
+ if save_dirs is not None:
+ for save_dir_str in save_dirs:
+ save_dir = Path(save_dir_str)
+ if options['user_config'] == choice.value:
+ config_output.save_user_config(save_dir)
+ elif options['user_creds'] == choice.value:
+ config_output.save_user_creds(save_dir)
+ elif options['disk_layout'] == choice.value:
+ config_output.save_disk_layout(save_dir)
+ elif options['all'] == choice.value:
+ config_output.save_user_config(save_dir)
+ config_output.save_user_creds(save_dir)
+ config_output.save_disk_layout(save_dir)
diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py
index 8454a3da..68a1a7d2 100644
--- a/archinstall/lib/user_interaction/system_conf.py
+++ b/archinstall/lib/user_interaction/system_conf.py
@@ -60,7 +60,6 @@ def select_harddrives(preset: List[str] = []) -> List[str]:
selected_harddrive = Menu(
title,
list(options.keys()),
- preset_values=preset,
multi=True,
allow_reset=True,
allow_reset_warning_msg=warning