index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Andreas Baumann <mail@andreasbaumann.cc> | 2023-04-02 13:50:24 +0200 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2023-04-02 13:50:24 +0200 |
commit | af7ab9833c9f9944874f0162ae0975175ddc628d (patch) | |
tree | c486d4948b5994125f95c8aa1d61a059c1351127 /archinstall/lib | |
parent | 2caad35a885f9be30d5bf79a47f5456f276ae67b (diff) | |
parent | 6e3c6f8863041b54f6d8cf7af37f9719c493eadd (diff) |
-rw-r--r-- | archinstall/lib/configuration.py | 5 | ||||
-rw-r--r-- | archinstall/lib/disk/encryption.py | 26 | ||||
-rw-r--r-- | archinstall/lib/disk/filesystem.py | 2 | ||||
-rw-r--r-- | archinstall/lib/disk/helpers.py | 88 | ||||
-rw-r--r-- | archinstall/lib/disk/mapperdev.py | 4 | ||||
-rw-r--r-- | archinstall/lib/disk/partition.py | 133 | ||||
-rw-r--r-- | archinstall/lib/disk/validators.py | 6 | ||||
-rw-r--r-- | archinstall/lib/general.py | 50 | ||||
-rw-r--r-- | archinstall/lib/hsm/fido.py | 21 | ||||
-rw-r--r-- | archinstall/lib/installer.py | 237 | ||||
-rw-r--r-- | archinstall/lib/luks.py | 2 | ||||
-rw-r--r-- | archinstall/lib/menu/abstract_menu.py | 20 | ||||
-rw-r--r-- | archinstall/lib/menu/global_menu.py | 14 | ||||
-rw-r--r-- | archinstall/lib/models/disk_encryption.py | 59 | ||||
-rw-r--r-- | archinstall/lib/packages/packages.py | 3 | ||||
-rw-r--r-- | archinstall/lib/systemd.py | 2 | ||||
-rw-r--r-- | archinstall/lib/user_interaction/general_conf.py | 3 | ||||
-rw-r--r-- | archinstall/lib/user_interaction/partitioning_conf.py | 4 | ||||
-rw-r--r-- | archinstall/lib/user_interaction/save_conf.py | 92 | ||||
-rw-r--r-- | archinstall/lib/user_interaction/system_conf.py | 1 |
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py index ad537b21..c036783f 100644 --- a/archinstall/lib/configuration.py +++ b/archinstall/lib/configuration.py @@ -44,7 +44,7 @@ class ConfigurationOutput: self._disk_layout_file = "user_disk_layout.json" self._sensitive = ['!users'] - self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run', 'disk_encryption'] + self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run'] self._process_config() @@ -71,6 +71,9 @@ class ConfigurationOutput: else: self._user_config[key] = self._config[key] + if key == 'disk_encryption' and self._config[key]: # special handling for encryption password + self._user_credentials['encryption_password'] = self._config[key].encryption_password + def user_config_to_json(self) -> str: return json.dumps({ 'config_version': storage['__version__'], # Tells us what version was used to generate the config diff --git a/archinstall/lib/disk/encryption.py b/archinstall/lib/disk/encryption.py index 67f656c8..c7496bfa 100644 --- a/archinstall/lib/disk/encryption.py +++ b/archinstall/lib/disk/encryption.py @@ -46,7 +46,7 @@ class DiskEncryptionMenu(AbstractSubMenu): Selector( _('Partitions'), func=lambda preset: select_partitions_to_encrypt(self._disk_layouts, preset), - display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None, + display_func=lambda x: f'{sum([len(y) for y in x.values()])} {_("Partitions")}' if x else None, dependencies=['encryption_password'], default=self._preset.partitions, preview_func=self._prev_disk_layouts, @@ -86,9 +86,14 @@ class DiskEncryptionMenu(AbstractSubMenu): def _prev_disk_layouts(self) -> Optional[str]: selector = self._menu_options['partitions'] if selector.has_selection(): - partitions: List[Any] = selector.current_selection + partitions: Dict[str, Any] = selector.current_selection + + all_partitions = [] + for parts in partitions.values(): + all_partitions += parts + output = str(_('Partitions to be encrypted')) + '\n' - output += current_partition_layout(partitions, with_title=False) + output += current_partition_layout(all_partitions, with_title=False) return output.rstrip() return None @@ -132,7 +137,7 @@ def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]: return None -def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: List[Any]) -> List[Any]: +def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: Dict[str, Any]) -> Dict[str, Any]: # If no partitions was marked as encrypted, but a password was supplied and we have some disks to format.. # Then we need to identify which partitions to encrypt. This will default to / (root). all_partitions = [] @@ -153,10 +158,17 @@ def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: List[Any] match choice.type_: case MenuSelectionType.Reset: - return [] + return {} case MenuSelectionType.Skip: return preset case MenuSelectionType.Selection: - return choice.value # type: ignore + selections: List[Any] = choice.value # type: ignore + partitions = {} + + for path, device in disk_layouts.items(): + for part in selections: + if part in device.get('partitions', []): + partitions.setdefault(path, []).append(part) - return [] + return partitions + return {} diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index bdfa502a..1083df53 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -113,7 +113,7 @@ class Filesystem: format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[]) disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption') - if partition in disk_encryption.partitions: + if disk_encryption and partition in disk_encryption.all_partitions: if not partition['device_instance']: raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index a5164b76..80d0cb53 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -212,6 +212,47 @@ def all_disks() -> List[BlockDevice]: log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow") return all_blockdevices(partitions=False, mappers=False) +def get_blockdevice_info(device_path, exclude_iso_dev :bool = True) -> Dict[str, Any]: + for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']): + partprobe(device_path) + time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) + + try: + if exclude_iso_dev: + # exclude all devices associated with the iso boot locations + iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt'] + + try: + lsblk_info = get_lsblk_info(device_path) + except DiskError: + continue + + if any([dev in lsblk_info.mountpoints for dev in iso_devs]): + continue + + information = blkid(f'blkid -p -o export {device_path}') + return enrich_blockdevice_information(information) + except SysCallError as ex: + if ex.exit_code == 2: + # Assume that it's a loop device, and try to get info on it + try: + resolved_device_name = device_path.readlink().name + except OSError: + resolved_device_name = device_path.name + + try: + information = get_loop_info(device_path) + if not information: + raise SysCallError(f"Could not get loop information for {resolved_device_name}", exit_code=1) + return enrich_blockdevice_information(information) + + except SysCallError: + information = get_blockdevice_uevent(resolved_device_name) + return enrich_blockdevice_information(information) + else: + # We could not reliably get any information, perhaps the disk is clean of information? + if retry_attempt == storage['DISK_RETRY_ATTEMPTS'] - 1: + raise ex def all_blockdevices( mappers: bool = False, @@ -230,40 +271,18 @@ def all_blockdevices( # we'll iterate the /sys/class definitions and find the information # from there. for block_device in glob.glob("/sys/class/block/*"): - device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}") + try: + device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}") + except FileNotFoundError: + log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") if device_path.exists() is False: log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") continue - try: - if exclude_iso_dev: - # exclude all devices associated with the iso boot locations - iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt'] - lsblk_info = get_lsblk_info(device_path) - if any([dev in lsblk_info.mountpoints for dev in iso_devs]): - continue - - information = blkid(f'blkid -p -o export {device_path}') - except SysCallError as ex: - if ex.exit_code in (512, 2): - # Assume that it's a loop device, and try to get info on it - try: - information = get_loop_info(device_path) - if not information: - print("Exit code for blkid -p -o export was:", ex.exit_code) - raise SysCallError("Could not get loop information", exit_code=1) - - except SysCallError: - print("Not a loop device, trying uevent rules.") - information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) - else: - # We could not reliably get any information, perhaps the disk is clean of information? - print("Raising ex because:", ex.exit_code) - raise ex - # return instances - - information = enrich_blockdevice_information(information) + information = get_blockdevice_info(device_path) + if not information: + continue for path, path_info in information.items(): if path_info.get('DMCRYPT_NAME'): @@ -409,7 +428,6 @@ def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: return {} output = json.loads(output) - # print(output) mounts = {} @@ -421,11 +439,13 @@ def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: continue if isinstance(blockdev, Partition): - for blockdev_mountpoint in blockdev.mountpoints: - block_devices_mountpoints[blockdev_mountpoint] = blockdev + if blockdev.mountpoints: + for blockdev_mountpoint in blockdev.mountpoints: + block_devices_mountpoints[blockdev_mountpoint] = blockdev else: - for blockdev_mountpoint in blockdev.mount_information: - block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev + if blockdev.mount_information: + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py index 71ef2a79..bf1b3583 100644 --- a/archinstall/lib/disk/mapperdev.py +++ b/archinstall/lib/disk/mapperdev.py @@ -25,6 +25,10 @@ class MapperDev: return f"/dev/mapper/{self.mappername}" @property + def part_uuid(self): + return self.partition.part_uuid + + @property def partition(self): from .helpers import uevent, get_parent_of_partition from .partition import Partition diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 9febf102..87eaa6a7 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -98,17 +98,18 @@ class Partition: if mountpoint: self.mount(mountpoint) - self._partition_info = self._fetch_information() - - if not autodetect_filesystem and filesystem: - self._partition_info.filesystem_type = filesystem + try: + self._partition_info = self._fetch_information() + + if not autodetect_filesystem and filesystem: + self._partition_info.filesystem_type = filesystem - if self._partition_info.filesystem_type == 'crypto_LUKS': - self._encrypted = True + if self._partition_info.filesystem_type == 'crypto_LUKS': + self._encrypted = True + except DiskError: + self._partition_info = None - # I hate doint this but I'm currently unsure where this - # is acutally used to be able to fix the typing issues properly - @typing.no_type_check + @typing.no_type_check # I hate doint this but I'm currently unsure where this is used. def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path @@ -120,14 +121,17 @@ class Partition: def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' - if mountpoint := self._partition_info.get_first_mountpoint(): - mount_repr = f", mounted={mountpoint}" - elif self._target_mountpoint: - mount_repr = f", rel_mountpoint={self._target_mountpoint}" + if self._partition_info: + if mountpoint := self._partition_info.get_first_mountpoint(): + mount_repr = f", mounted={mountpoint}" + elif self._target_mountpoint: + mount_repr = f", rel_mountpoint={self._target_mountpoint}" classname = self.__class__.__name__ - if self._encrypted: + if not self._partition_info: + return f'{classname}(path={self._path})' + elif self._encrypted: return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})' else: return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})' @@ -146,7 +150,7 @@ class Partition: 'encrypted': self._encrypted, 'start': self.start, 'size': self.end, - 'filesystem': self._partition_info.filesystem_type + 'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown' } return partition_info @@ -164,34 +168,37 @@ class Partition: 'start': self.start, 'size': self.end, 'filesystem': { - 'format': self._partition_info.filesystem_type + 'format': self._partition_info.filesystem_type if self._partition_info else 'None' } } def _call_lsblk(self) -> Dict[str, Any]: - self.partprobe() - # This sleep might be overkill, but lsblk is known to - # work against a chaotic cache that can change during call - # causing no information to be returned (blkid is better) - # time.sleep(1) + for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']): + self.partprobe() + time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk + # This sleep might be overkill, but lsblk is known to + # work against a chaotic cache that can change during call + # causing no information to be returned (blkid is better) + # time.sleep(1) - # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) + # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) - try: - output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - except SysCallError as error: - # It appears as if lsblk can return exit codes like 8192 to indicate something. - # But it does return output so we'll try to catch it. - output = error.worker.decode('UTF-8') - - if output: try: - lsblk_info = json.loads(output) - return lsblk_info - except json.decoder.JSONDecodeError: - log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR) - - raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk') + output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') + except SysCallError as error: + # Get the output minus the message/info from lsblk if it returns a non-zero exit code. + output = error.worker.decode('UTF-8') + if '{' in output: + output = output[output.find('{'):] + + if output: + try: + lsblk_info = json.loads(output) + return lsblk_info + except json.decoder.JSONDecodeError: + log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR) + + raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk') def _call_sfdisk(self) -> Dict[str, Any]: output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8') @@ -212,9 +219,12 @@ class Partition: lsblk_info = self._call_lsblk() sfdisk_info = self._call_sfdisk() - if not (device := lsblk_info.get('blockdevices', [None])[0]): + if not (device := lsblk_info.get('blockdevices', [])): raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') + # Grab the first (and only) block device in the list as we're targeting a specific partition + device = device[0] + mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint] bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' @@ -243,7 +253,8 @@ class Partition: @property def filesystem(self) -> str: - return self._partition_info.filesystem_type + if self._partition_info: + return self._partition_info.filesystem_type @property def mountpoint(self) -> Optional[Path]: @@ -253,43 +264,51 @@ class Partition: @property def mountpoints(self) -> List[Path]: - return self._partition_info.mountpoints + if self._partition_info: + return self._partition_info.mountpoints @property def sector_size(self) -> int: - return self._partition_info.sector_size + if self._partition_info: + return self._partition_info.sector_size @property def start(self) -> Optional[int]: - return self._partition_info.start + if self._partition_info: + return self._partition_info.start @property def end(self) -> Optional[int]: - return self._partition_info.end + if self._partition_info: + return self._partition_info.end @property def end_sectors(self) -> Optional[int]: - start = self._partition_info.start - end = self._partition_info.end - if start and end: - return start + end - return None + if self._partition_info: + start = self._partition_info.start + end = self._partition_info.end + if start and end: + return start + end @property def size(self) -> Optional[float]: - return self._partition_info.size + if self._partition_info: + return self._partition_info.size @property def boot(self) -> bool: - return self._partition_info.bootable + if self._partition_info: + return self._partition_info.bootable @property def partition_type(self) -> Optional[str]: - return self._partition_info.pttype + if self._partition_info: + return self._partition_info.pttype @property def part_uuid(self) -> str: - return self._partition_info.partuuid + if self._partition_info: + return self._partition_info.partuuid @property def uuid(self) -> Optional[str]: @@ -355,7 +374,8 @@ class Partition: log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}") - return self._partition_info.uuid + if self._partition_info: + return self._partition_info.uuid @property def encrypted(self) -> Union[bool, None]: @@ -611,14 +631,7 @@ class Partition: return False def unmount(self) -> bool: - worker = SysCommand(f"/usr/bin/umount {self._path}") - exit_code = worker.exit_code - - # Without to much research, it seams that low error codes are errors. - # And above 8k is indicators such as "/dev/x not mounted.". - # So anything in between 0 and 8k are errors (?). - if exit_code and 0 < exit_code < 8000: - raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code) + SysCommand(f"/usr/bin/umount {self._path}") # Update the partition info since the mount info has changed after this call. self._partition_info = self._fetch_information() diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index 54808886..076a8ba2 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -7,10 +7,12 @@ def valid_parted_position(pos :str) -> bool: if pos.isdigit(): return True - if pos.lower().endswith('b') and pos[:-1].isdigit(): + pos_lower = pos.lower() + + if (pos_lower.endswith('b') or pos_lower.endswith('s')) and pos[:-1].isdigit(): return True - if any(pos.lower().endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit() + if any(pos_lower.endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit() for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']): return True diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index 8c7aed91..79ab024b 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -130,7 +130,10 @@ class JsonEncoder: copy[JsonEncoder._encode(key)] = val return copy elif hasattr(obj, 'json'): - return obj.json() + # json() is a friendly name for json-helper, it should return + # a dictionary representation of the object so that it can be + # processed by the json library. + return json.loads(json.dumps(obj.json(), cls=JSON)) elif hasattr(obj, '__dump__'): return obj.__dump__() elif isinstance(obj, (datetime, date)): @@ -185,12 +188,16 @@ class SysCommandWorker: def __init__(self, cmd :Union[str, List[str]], callbacks :Optional[Dict[str, Any]] = None, + peek_output :Optional[bool] = False, peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, logfile :Optional[None] = None, working_directory :Optional[str] = './', remove_vt100_escape_codes_from_lines :bool = True): + if peak_output: + log("SysCommandWorker()'s peak_output is deprecated, use peek_output instead.", level=logging.WARNING, fg='red') + if not callbacks: callbacks = {} if not environment_vars: @@ -208,7 +215,9 @@ class SysCommandWorker: self.cmd = cmd self.callbacks = callbacks - self.peak_output = peak_output + self.peek_output = peek_output + if not self.peek_output and peak_output: + self.peek_output = peak_output # define the standard locale for command outputs. For now the C ascii one. Can be overridden self.environment_vars = {**storage.get('CMD_LOCALE',{}),**environment_vars} self.logfile = logfile @@ -262,7 +271,7 @@ class SysCommandWorker: except: pass - if self.peak_output: + if self.peek_output: # To make sure any peaked output didn't leave us hanging # on the same line we were on. sys.stdout.write("\n") @@ -307,7 +316,7 @@ class SysCommandWorker: self._trace_log_pos = min(max(0, pos), len(self._trace_log)) def peak(self, output: Union[str, bytes]) -> bool: - if self.peak_output: + if self.peek_output: if type(output) == bytes: try: output = output.decode('UTF-8') @@ -320,8 +329,8 @@ class SysCommandWorker: if peak_logfile.exists() is False: change_perm = True - with peak_logfile.open("a") as peak_output_log: - peak_output_log.write(output) + with peak_logfile.open("a") as peek_output_log: + peek_output_log.write(output) if change_perm: os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) @@ -349,10 +358,12 @@ class SysCommandWorker: if self.ended or (got_output is False and pid_exists(self.pid) is False): self.ended = time.time() try: - self.exit_code = os.waitpid(self.pid, 0)[1] + wait_status = os.waitpid(self.pid, 0)[1] + self.exit_code = os.waitstatus_to_exitcode(wait_status) except ChildProcessError: try: - self.exit_code = os.waitpid(self.child_fd, 0)[1] + wait_status = os.waitpid(self.child_fd, 0)[1] + self.exit_code = os.waitstatus_to_exitcode(wait_status) except ChildProcessError: self.exit_code = 1 @@ -385,6 +396,13 @@ class SysCommandWorker: os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) except PermissionError: pass + # If history_logfile does not exist, ignore the error + except FileNotFoundError: + pass + except Exception as e: + exception_type = type(e).__name__ + log(f"Unexpected {exception_type} occurred in {self.cmd}: {e}", level=logging.ERROR) + raise e os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars}) if storage['arguments'].get('debug'): @@ -412,11 +430,15 @@ class SysCommand: cmd :Union[str, List[str]], callbacks :Optional[Dict[str, Callable[[Any], Any]]] = None, start_callback :Optional[Callable[[Any], Any]] = None, + peek_output :Optional[bool] = False, peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, working_directory :Optional[str] = './', remove_vt100_escape_codes_from_lines :bool = True): + if peak_output: + log("SysCommandWorker()'s peak_output is deprecated, use peek_output instead.", level=logging.WARNING, fg='red') + _callbacks = {} if callbacks: for hook, func in callbacks.items(): @@ -426,7 +448,9 @@ class SysCommand: self.cmd = cmd self._callbacks = _callbacks - self.peak_output = peak_output + self.peek_output = peek_output + if not self.peek_output and peak_output: + self.peek_output = peak_output self.environment_vars = environment_vars self.working_directory = working_directory self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines @@ -469,7 +493,7 @@ class SysCommand: return { 'cmd': self.cmd, 'callbacks': self._callbacks, - 'peak': self.peak_output, + 'peak': self.peek_output, 'environment_vars': self.environment_vars, 'session': True if self.session else False } @@ -478,7 +502,7 @@ class SysCommand: """ Initiates a :ref:`SysCommandWorker` session in this class ``.session``. It then proceeds to poll the process until it ends, after which it also - clears any printed output if ``.peak_output=True``. + clears any printed output if ``.peek_output=True``. """ if self.session: return self.session @@ -486,7 +510,7 @@ class SysCommand: with SysCommandWorker( self.cmd, callbacks=self._callbacks, - peak_output=self.peak_output, + peek_output=self.peek_output, environment_vars=self.environment_vars, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines, working_directory=self.working_directory) as session: @@ -497,7 +521,7 @@ class SysCommand: while self.session.ended is None: self.session.poll() - if self.peak_output: + if self.peek_output: sys.stdout.write('\n') sys.stdout.flush() diff --git a/archinstall/lib/hsm/fido.py b/archinstall/lib/hsm/fido.py index 4cd956a3..1c226322 100644 --- a/archinstall/lib/hsm/fido.py +++ b/archinstall/lib/hsm/fido.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import getpass import logging from dataclasses import dataclass from pathlib import Path -from typing import List +from typing import List, Dict from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes from ..disk.partition import Partition @@ -16,6 +18,21 @@ class Fido2Device: manufacturer: str product: str + def json(self) -> Dict[str, str]: + return { + 'path': str(self.path), + 'manufacturer': self.manufacturer, + 'product': self.product + } + + @classmethod + def parse_arg(cls, arg: Dict[str, str]) -> 'Fido2Device': + return Fido2Device( + Path(arg['path']), + arg['manufacturer'], + arg['product'] + ) + class Fido2: _loaded: bool = False @@ -76,7 +93,7 @@ class Fido2: @classmethod def fido2_enroll(cls, hsm_device: Fido2Device, partition :Partition, password :str): - worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {partition.real_device}", peak_output=True) + worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {partition.real_device}", peek_output=True) pw_inputted = False pin_inputted = False diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 1926f593..f1c7b3db 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -132,6 +132,7 @@ class Installer: # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override. self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"] self.KERNEL_PARAMS = [] + self.FSTAB_ENTRIES = [] self._zram_enabled = False @@ -198,7 +199,7 @@ class Installer: def _create_keyfile(self,luks_handle , partition :dict, password :str): """ roiutine to create keyfiles, so it can be moved elsewhere """ - if self._disk_encryption.generate_encryption_file(partition): + if self._disk_encryption and self._disk_encryption.generate_encryption_file(partition): if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): cryptkey_dir.mkdir(parents=True) # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key @@ -246,20 +247,21 @@ class Installer: mount_queue = {} # we manage the encrypted partititons - for partition in self._disk_encryption.partitions: - # open the luks device and all associate stuff - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" - - # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used - with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device: - if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition): - list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password]) - # this way all the requesrs will be to the dm_crypt device and not to the physical partition - partition['device_instance'] = unlocked_device - - if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False: - if self._disk_encryption.hsm_device: - Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password) + if self._disk_encryption: + for partition in self._disk_encryption.all_partitions: + # open the luks device and all associate stuff + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" + + # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used + with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device: + if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition): + list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password]) + # this way all the requesrs will be to the dm_crypt device and not to the physical partition + partition['device_instance'] = unlocked_device + + if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False: + if self._disk_encryption.hsm_device: + Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password) btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])] @@ -292,7 +294,7 @@ class Installer: else: mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target) - log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.INFO, fg="white") + log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.DEBUG, fg="white") # We mount everything by sorting on the mountpoint itself. for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]): @@ -317,6 +319,26 @@ class Installer: partition.mount(f'{self.target}{mountpoint}', options=options) + def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'): + if file[:1] != '/': + file = f"/{file}" + if len(file.strip()) <= 0 or file == '/': + raise ValueError(f"The filename for the swap file has to be a valid path, not: {self.target}{file}") + + SysCommand(f'dd if=/dev/zero of={self.target}{file} bs={size} count=1') + SysCommand(f'chmod 0600 {self.target}{file}') + SysCommand(f'mkswap {self.target}{file}') + + self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0') + + if enable_resume: + resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip() + resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip() + + self.HOOKS.append('resume') + self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}') + self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}') + def post_install_check(self, *args :str, **kwargs :str) -> List[str]: return [step for step, flag in self.helper_flags.items() if flag is False] @@ -396,7 +418,8 @@ class Installer: raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red") try: - return SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf {self.target} {" ".join(packages)} --noconfirm', peak_output=True).exit_code == 0 + SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) + return True except SysCallError as error: self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red") @@ -417,8 +440,10 @@ class Installer: def genfstab(self, flags :str = '-pU') -> bool: self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) - if not (fstab := SysCommand(f'/usr/bin/genfstab {flags} {self.target}')).exit_code == 0: - raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}') + try: + fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}') + except SysCallError as error: + raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}') with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: fstab_fh.write(fstab.decode()) @@ -431,6 +456,10 @@ class Installer: if plugin.on_genfstab(self) is True: break + with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: + for entry in self.FSTAB_ENTRIES: + fstab_fh.write(f'{entry}\n') + return True def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: @@ -463,7 +492,11 @@ class Installer: with open(f'{self.target}/etc/locale.conf', 'w') as fh: fh.write(f'LANG={locale}.{encoding}{modifier}\n') - return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False + try: + SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen') + return True + except SysCallError: + return False def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool: if not zone: @@ -519,8 +552,10 @@ class Installer: def enable_service(self, *services :str) -> None: for service in services: self.log(f'Enabling service {service}', level=logging.INFO) - if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0: - raise ServiceException(f"Unable to start service {service}: {output}") + try: + self.arch_chroot(f'systemctl enable {service}') + except SysCallError as error: + raise ServiceException(f"Unable to start service {service}: {error}") for plugin in plugins.values(): if hasattr(plugin, 'on_service'): @@ -641,12 +676,17 @@ class Installer: if plugin.on_mkinitcpio(self): return True + # mkinitcpio will error out if there's no vconsole. + if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False: + with vconsole.open('w') as fh: + fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") + with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit: mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n") mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") - if not self._disk_encryption.hsm_device: + if self._disk_encryption and not self._disk_encryption.hsm_device: # For now, if we don't use HSM we revert to the old # way of setting up encryption hooks for mkinitcpio. # This is purely for stability reasons, we're going away from this. @@ -656,7 +696,11 @@ class Installer: mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") - return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0 + try: + SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') + return True + except SysCallError: + return False def minimal_installation( self, testing: bool = False, multilib: bool = False, @@ -690,7 +734,7 @@ class Installer: self.HOOKS.remove('fsck') if self.detect_encryption(partition): - if self._disk_encryption.hsm_device: + if self._disk_encryption and self._disk_encryption.hsm_device: # Required bby mkinitcpio to add support for fido2-device options self.pacstrap('libfido2') @@ -754,14 +798,6 @@ class Installer: # TODO: Use python functions for this SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') - if self._disk_encryption.hsm_device: - # TODO: - # A bit of a hack, but we need to get vconsole.conf in there - # before running `mkinitcpio` because it expects it in HSM mode. - if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False: - with vconsole.open('w') as fh: - fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") - self.mkinitcpio('-P') self.helper_flags['base'] = True @@ -841,60 +877,68 @@ class Installer: os.makedirs(f'{self.target}/boot/loader/entries') for kernel in self.kernels: - # Setup the loader entry - with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}.conf', 'w') as entry: - entry.write('# Created by: archinstall\n') - entry.write(f'# Created on: {self.init_time}\n') - entry.write(f'title Arch Linux ({kernel})\n') - entry.write(f"linux /vmlinuz-{kernel}\n") - if not is_vm(): - vendor = cpu_vendor() - if vendor == "AuthenticAMD": - entry.write("initrd /amd-ucode.img\n") - elif vendor == "GenuineIntel": - entry.write("initrd /intel-ucode.img\n") + for variant in ("", "-fallback"): + # Setup the loader entry + with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}{variant}.conf', 'w') as entry: + entry.write('# Created by: archinstall\n') + entry.write(f'# Created on: {self.init_time}\n') + entry.write(f'title Arch Linux ({kernel}{variant})\n') + entry.write(f"linux /vmlinuz-{kernel}\n") + if not is_vm(): + vendor = cpu_vendor() + if vendor == "AuthenticAMD": + entry.write("initrd /amd-ucode.img\n") + elif vendor == "GenuineIntel": + entry.write("initrd /intel-ucode.img\n") + else: + self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG) + entry.write(f"initrd /initramfs-{kernel}{variant}.img\n") + # blkid doesn't trigger on loopback devices really well, + # so we'll use the old manual method until we get that sorted out. + root_fs_type = get_mount_fs_type(root_partition.filesystem) + + if root_fs_type is not None: + options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n' else: - self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG) - entry.write(f"initrd /initramfs-{kernel}.img\n") - # blkid doesn't trigger on loopback devices really well, - # so we'll use the old manual method until we get that sorted out. - root_fs_type = get_mount_fs_type(root_partition.filesystem) - - if root_fs_type is not None: - options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n' - else: - options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n' + options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n' - for subvolume in root_partition.subvolumes: - if subvolume.root is True and subvolume.name != '<FS_TREE>': - options_entry = f"rootflags=subvol={subvolume.name} " + options_entry + for subvolume in root_partition.subvolumes: + if subvolume.root is True and subvolume.name != '<FS_TREE>': + options_entry = f"rootflags=subvol={subvolume.name} " + options_entry - # Zswap should be disabled when using zram. - # - # https://github.com/archlinux/archinstall/issues/881 - if self._zram_enabled: - options_entry = "zswap.enabled=0 " + options_entry + # Zswap should be disabled when using zram. + # + # https://github.com/archlinux/archinstall/issues/881 + if self._zram_enabled: + options_entry = "zswap.enabled=0 " + options_entry - if real_device := self.detect_encryption(root_partition): - # TODO: We need to detect if the encrypted device is a whole disk encryption, - # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG) + if real_device := self.detect_encryption(root_partition): + # TODO: We need to detect if the encrypted device is a whole disk encryption, + # or simply a partition encryption. Right now we assume it's a partition (and we always have) + log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG) - kernel_options = f"options" + kernel_options = f"options" - if self._disk_encryption.hsm_device: + if self._disk_encryption.hsm_device: + # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work + kernel_options += f" rd.luks.name={real_device.uuid}=luksdev" + # Note: tpm2-device and fido2-device don't play along very well: + # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 + kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no" + else: + kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev" + + entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}') + + if self._disk_encryption and self._disk_encryption.hsm_device: # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work kernel_options += f" rd.luks.name={real_device.uuid}=luksdev" # Note: tpm2-device and fido2-device don't play along very well: # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no" else: - kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev" - - entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}') - else: - log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) - entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}') + log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) + entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}') self.helper_flags['bootloader'] = "systemd" @@ -923,15 +967,15 @@ class Installer: if has_uefi(): self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peak_output=True) + SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) except SysCallError: try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peak_output=True) + SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) except SysCallError as error: raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}") else: try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peak_output=True) + SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True) except SysCallError as error: raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}") @@ -1109,8 +1153,10 @@ class Installer: if not handled_by_plugin: self.log(f'Creating user {user}', level=logging.INFO) - if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0: - raise SystemError(f"Could not create user inside installation: {output}") + try: + SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') + except SysCallError as error: + raise SystemError(f"Could not create user inside installation: {error}") for plugin in plugins.values(): if hasattr(plugin, 'on_user_created'): @@ -1138,17 +1184,28 @@ class Installer: echo = shlex.join(['echo', combo]) sh = shlex.join(['sh', '-c', echo]) - result = SysCommand(f"/usr/bin/arch-chroot {self.target} " + sh[:-1] + " | chpasswd'") - return result.exit_code == 0 + try: + SysCommand(f"/usr/bin/arch-chroot {self.target} " + sh[:-1] + " | chpasswd'") + return True + except SysCallError: + return False def user_set_shell(self, user :str, shell :str) -> bool: self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) - return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0 + try: + SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") + return True + except SysCallError: + return False def chown(self, owner :str, path :str, options :List[str] = []) -> bool: cleaned_path = path.replace('\'', '\\\'') - return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'").exit_code == 0 + try: + SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'") + return True + except SysCallError: + return False def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: return InstallationFile(self, filename, owner) @@ -1166,8 +1223,10 @@ class Installer: with Boot(self) as session: os.system('/usr/bin/systemd-run --machine=archinstall --pty localectl set-keymap ""') - if (output := session.SysCommand(["localectl", "set-keymap", language])).exit_code != 0: - raise ServiceException(f"Unable to set locale '{language}' for console: {output}") + try: + session.SysCommand(["localectl", "set-keymap", language]) + except SysCallError as error: + raise ServiceException(f"Unable to set locale '{language}' for console: {error}") self.log(f"Keyboard language for this installation is now set to: {language}") else: @@ -1190,8 +1249,10 @@ class Installer: with Boot(self) as session: session.SysCommand(["localectl", "set-x11-keymap", '""']) - if (output := session.SysCommand(["localectl", "set-x11-keymap", language])).exit_code != 0: - raise ServiceException(f"Unable to set locale '{language}' for X11: {output}") + try: + session.SysCommand(["localectl", "set-x11-keymap", language]) + except SysCallError as error: + raise ServiceException(f"Unable to set locale '{language}' for X11: {error}") else: self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO) diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 7e4534d8..ad6bf093 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -115,7 +115,7 @@ class luks2: if cmd_handle.exit_code != 0: raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}') except SysCallError as err: - if err.exit_code == 256: + if err.exit_code == 1: log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG) # Partition was in use, unmount it and try again partition.unmount() diff --git a/archinstall/lib/menu/abstract_menu.py b/archinstall/lib/menu/abstract_menu.py index 5a7ca03a..d659d709 100644 --- a/archinstall/lib/menu/abstract_menu.py +++ b/archinstall/lib/menu/abstract_menu.py @@ -17,14 +17,14 @@ class Selector: def __init__( self, description :str, - func :Callable = None, - display_func :Callable = None, + func :Optional[Callable] = None, + display_func :Optional[Callable] = None, default :Any = None, enabled :bool = False, dependencies :List = [], dependencies_not :List = [], - exec_func :Callable = None, - preview_func :Callable = None, + exec_func :Optional[Callable] = None, + preview_func :Optional[Callable] = None, mandatory :bool = False, no_store :bool = False ): @@ -165,7 +165,7 @@ class Selector: class AbstractMenu: - def __init__(self, data_store: Dict[str, Any] = None, auto_cursor=False, preview_size :float = 0.2): + def __init__(self, data_store: Optional[Dict[str, Any]] = None, auto_cursor=False, preview_size :float = 0.2): """ Create a new selection menu. @@ -226,7 +226,7 @@ class AbstractMenu: """ will be called before each action in the menu """ return - def post_callback(self, selection_name: str = None, value: Any = None): + def post_callback(self, selection_name: Optional[str] = None, value: Any = None): """ will be called after each action in the menu """ return True @@ -356,7 +356,7 @@ class AbstractMenu: config_name, selector = self._find_selection(selection_name) return self.exec_option(config_name, selector) - def exec_option(self, config_name :str, p_selector :Selector = None) -> bool: + def exec_option(self, config_name :str, p_selector :Optional[Selector] = None) -> bool: """ processes the execution of a given menu entry - pre process callback - selection function @@ -372,13 +372,13 @@ class AbstractMenu: self.pre_callback(config_name) result = None - if selector.func: + if selector.func is not None: presel_val = self.option(config_name).get_selection() result = selector.func(presel_val) self._menu_options[config_name].set_current_selection(result) if selector.do_store(): self._data_store[config_name] = result - exec_ret_val = selector.exec_func(config_name,result) if selector.exec_func else False + exec_ret_val = selector.exec_func(config_name,result) if selector.exec_func is not None else False self.post_callback(config_name,result) if exec_ret_val and self._check_mandatory_status(): @@ -478,7 +478,7 @@ class AbstractMenu: class AbstractSubMenu(AbstractMenu): - def __init__(self, data_store: Dict[str, Any] = None): + def __init__(self, data_store: Optional[Dict[str, Any]] = None): super().__init__(data_store=data_store) self._menu_options['__separator__'] = Selector('') diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py index 0d348227..7c5b153e 100644 --- a/archinstall/lib/menu/global_menu.py +++ b/archinstall/lib/menu/global_menu.py @@ -197,11 +197,11 @@ class GlobalMenu(AbstractMenu): self._menu_options['abort'] = Selector(_('Abort'), exec_func=lambda n,v:exit(1)) - def _update_install_text(self, name :str = None, result :Any = None): + def _update_install_text(self, name :Optional[str] = None, result :Any = None): text = self._install_text() self._menu_options['install'].update_description(text) - def post_callback(self,name :str = None ,result :Any = None): + def post_callback(self,name :Optional[str] = None ,result :Any = None): self._update_install_text(name, result) def _install_text(self): @@ -279,8 +279,8 @@ class GlobalMenu(AbstractMenu): output = str(_('Encryption type')) + f': {enc_type}\n' output += str(_('Password')) + f': {secret(encryption.encryption_password)}\n' - if encryption.partitions: - output += 'Partitions: {} selected'.format(len(encryption.partitions)) + '\n' + if encryption.all_partitions: + output += 'Partitions: {} selected'.format(len(encryption.all_partitions)) + '\n' if encryption.hsm_device: output += f'HSM: {encryption.hsm_device.manufacturer}' @@ -377,9 +377,9 @@ class GlobalMenu(AbstractMenu): return harddrives - def _select_profile(self, preset): + def _select_profile(self, preset) -> Optional[Profile]: + ret: Optional[Profile] = None profile = select_profile(preset) - ret = None if profile is None: if any([ @@ -403,7 +403,7 @@ class GlobalMenu(AbstractMenu): namespace = f'{profile.namespace}.py' with profile.load_instructions(namespace=namespace) as imported: if imported._prep_function(servers=servers, desktop=desktop, desktop_env=desktop_env, gfx_driver=gfx_driver): - ret: Profile = profile + ret = profile match ret.name: case 'minimal': diff --git a/archinstall/lib/models/disk_encryption.py b/archinstall/lib/models/disk_encryption.py index 3edab93e..a4a501d9 100644 --- a/archinstall/lib/models/disk_encryption.py +++ b/archinstall/lib/models/disk_encryption.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from dataclasses import dataclass, field -from enum import Enum, auto +from enum import Enum from typing import Optional, List, Dict, TYPE_CHECKING, Any from ..hsm.fido import Fido2Device @@ -9,8 +11,7 @@ if TYPE_CHECKING: class EncryptionType(Enum): - Partition = auto() - # FullDiskEncryption = auto() + Partition = 'partition' @classmethod def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']: @@ -35,9 +36,55 @@ class EncryptionType(Enum): class DiskEncryption: encryption_type: EncryptionType = EncryptionType.Partition encryption_password: str = '' - partitions: List[str] = field(default_factory=list) + partitions: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict) hsm_device: Optional[Fido2Device] = None + @property + def all_partitions(self) -> List[Dict[str, Any]]: + _all: List[Dict[str, Any]] = [] + for parts in self.partitions.values(): + _all += parts + return _all + def generate_encryption_file(self, partition) -> bool: - return partition in self.partitions and partition['mountpoint'] != '/' - + return partition in self.all_partitions and partition['mountpoint'] != '/' + + def json(self) -> Dict[str, Any]: + obj = { + 'encryption_type': self.encryption_type.value, + 'partitions': self.partitions + } + + if self.hsm_device: + obj['hsm_device'] = self.hsm_device.json() + + return obj + + @classmethod + def parse_arg( + cls, + disk_layout: Dict[str, Any], + arg: Dict[str, Any], + password: str = '' + ) -> 'DiskEncryption': + # we have to map the enc partition config to the disk layout objects + # they both need to point to the same object as it will get modified + # during the installation process + enc_partitions: Dict[str, List[Dict[str, Any]]] = {} + + for path, partitions in disk_layout.items(): + conf_partitions = arg['partitions'].get(path, []) + for part in partitions['partitions']: + if part in conf_partitions: + enc_partitions.setdefault(path, []).append(part) + + enc = DiskEncryption( + EncryptionType(arg['encryption_type']), + password, + enc_partitions + ) + + if hsm := arg.get('hsm_device', None): + enc.hsm_device = Fido2Device.parse_arg(hsm) + + return enc diff --git a/archinstall/lib/packages/packages.py b/archinstall/lib/packages/packages.py index d5834f43..0743e83b 100644 --- a/archinstall/lib/packages/packages.py +++ b/archinstall/lib/packages/packages.py @@ -1,3 +1,4 @@ +import dataclasses import json import ssl from typing import Dict, Any, Tuple, List @@ -112,4 +113,4 @@ def installed_package(package :str) -> LocalPackage: except SysCallError: pass - return LocalPackage(**package_info) + return LocalPackage({field.name: package_info.get(field.name) for field in dataclasses.fields(LocalPackage)}) diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py index f459f94b..64ffcae4 100644 --- a/archinstall/lib/systemd.py +++ b/archinstall/lib/systemd.py @@ -97,8 +97,6 @@ class Boot: shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now') except SysCallError as error: shutdown_exit_code = error.exit_code - # if error.exit_code == 256: - # pass while self.session.is_alive(): time.sleep(0.25) diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py index 76631a98..fc7ded45 100644 --- a/archinstall/lib/user_interaction/general_conf.py +++ b/archinstall/lib/user_interaction/general_conf.py @@ -174,7 +174,10 @@ def select_profile(preset) -> Optional[Profile]: storage['profile_minimal'] = False storage['_selected_servers'] = [] storage['_desktop_profile'] = None + storage['sway_sys_priv_ctrl'] = None + storage['arguments']['sway_sys_priv_ctrl'] = None storage['arguments']['desktop-environment'] = None + storage['arguments']['gfx_driver'] = None storage['arguments']['gfx_driver_packages'] = None return None case MenuSelectionType.Skip: diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py index cff76dc2..0a5ede51 100644 --- a/archinstall/lib/user_interaction/partitioning_conf.py +++ b/archinstall/lib/user_interaction/partitioning_conf.py @@ -208,7 +208,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, if fs_choice.type_ == MenuSelectionType.Skip: continue - prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format( + prompt = str(_('Enter the start location (in parted units: s, GB, %, etc. ; default: {}): ')).format( block_device.first_free_sector ) start = input(prompt).strip() @@ -219,7 +219,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, else: end_suggested = '100%' - prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format( + prompt = str(_('Enter the end location (in parted units: s, GB, %, etc. ; ex: {}): ')).format( end_suggested ) end = input(prompt).strip() diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py index d60ef995..5b4ae2b3 100644 --- a/archinstall/lib/user_interaction/save_conf.py +++ b/archinstall/lib/user_interaction/save_conf.py @@ -1,9 +1,12 @@ from __future__ import annotations +import logging + from pathlib import Path from typing import Any, Dict, TYPE_CHECKING from ..configuration import ConfigurationOutput +from ..general import SysCommand from ..menu import Menu from ..menu.menu import MenuSelectionType from ..output import log @@ -58,20 +61,75 @@ def save_config(config: Dict): if choice.type_ == MenuSelectionType.Skip: return - while True: - path = input(_('Enter a directory for the configuration(s) to be saved: ')).strip(' ') - dest_path = Path(path) - if dest_path.exists() and dest_path.is_dir(): - break - log(_('Not a valid directory: {}').format(dest_path), fg='red') - - if options['user_config'] == choice.value: - config_output.save_user_config(dest_path) - elif options['user_creds'] == choice.value: - config_output.save_user_creds(dest_path) - elif options['disk_layout'] == choice.value: - config_output.save_disk_layout(dest_path) - elif options['all'] == choice.value: - config_output.save_user_config(dest_path) - config_output.save_user_creds(dest_path) - config_output.save_disk_layout(dest_path) + dirs_to_exclude = [ + '/bin', + '/dev', + '/lib', + '/lib64', + '/lost+found', + '/opt', + '/proc', + '/run', + '/sbin', + '/srv', + '/sys', + '/usr', + '/var', + ] + log( + _('When picking a directory to save configuration files to,' + ' by default we will ignore the following folders: ') + ','.join(dirs_to_exclude), + level=logging.DEBUG + ) + + log(_('Finding possible directories to save configuration files ...'), level=logging.INFO) + + find_exclude = '-path ' + ' -prune -o -path '.join(dirs_to_exclude) + ' -prune ' + file_picker_command = f'find / {find_exclude} -o -type d -print0' + possible_save_dirs = list( + filter(None, SysCommand(file_picker_command).decode().split('\x00')) + ) + + selection = Menu( + _('Select directory (or directories) for saving configuration files'), + possible_save_dirs, + multi=True, + skip=True, + allow_reset=False, + ).run() + + match selection.type_: + case MenuSelectionType.Skip: + return + case _: + save_dirs = selection.value + + prompt = _('Do you want to save {} configuration file(s) in the following locations?\n\n{}').format( + list(options.keys())[list(options.values()).index(choice.value)], + save_dirs + ) + save_confirmation = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run() + if save_confirmation == Menu.no(): + return + + log( + _('Saving {} configuration files to {}').format( + list(options.keys())[list(options.values()).index(choice.value)], + save_dirs + ), + level=logging.DEBUG + ) + + if save_dirs is not None: + for save_dir_str in save_dirs: + save_dir = Path(save_dir_str) + if options['user_config'] == choice.value: + config_output.save_user_config(save_dir) + elif options['user_creds'] == choice.value: + config_output.save_user_creds(save_dir) + elif options['disk_layout'] == choice.value: + config_output.save_disk_layout(save_dir) + elif options['all'] == choice.value: + config_output.save_user_config(save_dir) + config_output.save_user_creds(save_dir) + config_output.save_disk_layout(save_dir) diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py index 8454a3da..68a1a7d2 100644 --- a/archinstall/lib/user_interaction/system_conf.py +++ b/archinstall/lib/user_interaction/system_conf.py @@ -60,7 +60,6 @@ def select_harddrives(preset: List[str] = []) -> List[str]: selected_harddrive = Menu( title, list(options.keys()), - preset_values=preset, multi=True, allow_reset=True, allow_reset_warning_msg=warning |