From 1ae1f2ff1144d502830834ba5a64262f7d195e91 Mon Sep 17 00:00:00 2001 From: Himadri Bhattacharjee <107522312+lavafroth@users.noreply.github.com> Date: Wed, 28 Jun 2023 11:42:53 +0000 Subject: Refactor installer and general design patterns (#1895) * fix: refactor clear_vt100_escape_codes * fix: check for structure being a dict after handling potential parsing errors * refactor: use short circuit logic than if-elif-else chains * fix: use or for nullish moutpoint attribute * fix: better error handling for JSON from urls and paths * chore: json_stream_to_structure documentation * refactor: dry up relative and chroot path for custom command scripts * refactor: use write_text for pathlib.Path object * refactor: use sets to find intersection instead of filter and list * refactor: replace loop with dictionary comprehension in preparing luks partition * refactor: use walrus operator to check if luks_handler exists * refactor: use read_text and splitlines for potential Path object * fix: use keepends in splitlines for compatibility * fix: use keepends in splitlines for compatibility * feat: set pacman_conf Path as an attribute of installer * fix: empty string is a part of any string, avoid tuples * refactor: use iterator patterns to uncomment multilib and testing blocks * fix: don't json.loads an already loaded structure * fix: use fstab_path uniformly in genfstab * fix: remove unused variable matched * refactor: create separate class to modify pacman.conf in a single pass * fix: remove unused attribute pacman_conf from installer * fix: remove unused attribute pacman_conf from installer * feat: add persist method for pacman.conf, rewrite only when needed * fix: use path.write_text for locale.conf * use `or` operator for nullish new_conf * refactor: Installer.target is always a pathlib.Path object, do not check for string type * fix: use Optional[str] in function type definition instead of sumtype of str and None * fix: mypy type annotation * fix: make flake8 happy * chore: move pacman config and repo into pacman module * refactor: use Pacman object instead of Installer's pacstrap method * fix: break after first sync * fix: keep old build script for now * use nullish operator for base_packages and disk_encryption of Installer * feat: use shutil.which instead of rolling our own implementation * fix: check for binary only if list is not empty * fix: import Enum and fix mypy errors * refactor: use nullish operator for default values * refactor: linear search for key in Installer._trace_log only once * fix: use logs instead of the entirety of self._trace_log when searching for key * refactor: do not copy slice of bytes for search * refactor: use rfind only once to iterate over logs, do not raise ValueError in clear_vt100_escape_codes since TYPE_CHECKING will take care of it. * refactor: try decoding trace log before falling back to strigification * refactor: use an empty dict as default for callbacks in SysCommand.__init__ * refactor: use nullish or operator for slice start and end when not specified * refactor: use nullish or operator for SysCommand session * refactor: use pre-existing decode method in __repr__ for SysCommand * fix: overindentation * fix: use shallow copy of callbacks to prevent mutating the key-value relationships of the argument dict * refactor: use truthy value of self.session is not None for json encoding SysCommand * refactor: directly assign to SysCommand.session in create_session since it short circuits to True if already present * refactor: use dict.items() instead of manually retrieving the value using the key * refactor: user_config_to_json method sounds pretty self explanatory * refactor: store path validity as boolean for return * refactor: use pathlib.Path.write_text to write configs to destinations * fix: cannot use assignment expressions with expression * fix: use config_output.save for saving both config and creds * refactor: switch dictionary keys and values for options to avoid redundancy * refactor: use itertools.takewhile to collect locale.gen entries until the empty line * refactor: use iterative approach for nvidia driver fix * refactor: install packages if not nvidia * refactor: return early if no profile is selected * refactor: use strip to remove commented lines * fix: install additional packages only when we have a driver * fix: path with one command is matched as relative to '.' * fix: remove translation for debug log --------- Co-authored-by: Anton Hvornum --- archinstall/lib/installer.py | 205 +++++++++++-------------------------------- 1 file changed, 51 insertions(+), 154 deletions(-) (limited to 'archinstall/lib/installer.py') diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index f1a7f71a..ee546993 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -20,7 +20,8 @@ from .models.bootloader import Bootloader from .models.network_configuration import NetworkConfiguration from .models.users import User from .output import log, error, info, warn, debug -from .pacman import run_pacman +from . import pacman +from .pacman import Pacman from .plugins import plugins from .storage import storage @@ -52,27 +53,16 @@ class Installer: `Installer()` is the wrapper for most basic installation steps. It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. """ - if not base_packages: - base_packages = __packages__[:3] - - if kernels is None: - self.kernels = ['linux'] - else: - self.kernels = kernels - + self.base_packages = base_packages or __packages__[:3] + self.kernels = kernels or ['linux'] self._disk_config = disk_config - if disk_encryption is None: - self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption) - else: - self._disk_encryption = disk_encryption - + self._disk_encryption = disk_encryption or disk.DiskEncryption(disk.EncryptionType.NoEncryption) self.target: Path = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} - self.base_packages = base_packages for kernel in self.kernels: self.base_packages.append(kernel) @@ -101,6 +91,7 @@ class Installer: self._fstab_entries: List[str] = [] self._zram_enabled = False + self.pacman = Pacman(self.target, storage['arguments'].get('silent', False)) def __enter__(self) -> 'Installer': return self @@ -189,35 +180,33 @@ class Installer: # partitions have to mounted in the right order on btrfs the mountpoint will # be empty as the actual subvolumes are getting mounted instead so we'll use # '/' just for sorting - sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint if x.mountpoint else Path('/')) + sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/')) + enc_partitions = [] if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption: - enc_partitions = list(filter(lambda x: x in self._disk_encryption.partitions, sorted_part_mods)) - else: - enc_partitions = [] + enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions)) # attempt to decrypt all luks partitions luks_handlers = self._prepare_luks_partitions(enc_partitions) for part_mod in sorted_part_mods: - if part_mod not in luks_handlers: # partition is not encrypted + if luks_handler := luks_handlers.get(part_mod): + # mount encrypted partition + self._mount_luks_partiton(part_mod, luks_handler) + else: + # partition is not encrypted self._mount_partition(part_mod) - else: # mount encrypted partition - self._mount_luks_partiton(part_mod, luks_handlers[part_mod]) def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[disk.PartitionModification, Luks2]: - luks_handlers = {} - - for part_mod in partitions: - if part_mod.mapper_name and part_mod.dev_path: - luks_handler = disk.device_handler.unlock_luks2_dev( - part_mod.dev_path, - part_mod.mapper_name, - self._disk_encryption.encryption_password - ) - luks_handlers[part_mod] = luks_handler - - return luks_handlers + return { + part_mod: disk.device_handler.unlock_luks2_dev( + part_mod.dev_path, + part_mod.mapper_name, + self._disk_encryption.encryption_password + ) + for part_mod in partitions + if part_mod.mapper_name and part_mod.dev_path + } def _mount_partition(self, part_mod: disk.PartitionModification): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined @@ -302,93 +291,6 @@ class Installer: def post_install_check(self, *args :str, **kwargs :str) -> List[str]: return [step for step, flag in self.helper_flags.items() if flag is False] - def enable_multilib_repository(self): - # Set up a regular expression pattern of a commented line containing 'multilib' within [] - pattern = re.compile(r"^#\s*\[multilib\]$") - - # This is used to track if the previous line is a match, so we end up uncommenting the line after the block. - matched = False - - # Read in the lines from the original file - with open("/etc/pacman.conf", "r") as pacman_conf: - lines = pacman_conf.readlines() - - # Open the file again in write mode, to replace the contents - with open("/etc/pacman.conf", "w") as pacman_conf: - for line in lines: - if pattern.match(line): - # If this is the [] block containing 'multilib', uncomment it and set the matched tracking boolean. - pacman_conf.write(line.lstrip('#')) - matched = True - elif matched: - # The previous line was a match for [.*multilib.*]. - # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist' - pacman_conf.write(line.lstrip('#')) - matched = False # Reset the state of matched to False. - else: - pacman_conf.write(line) - - def enable_testing_repositories(self, enable_multilib_testing=False): - # Set up a regular expression pattern of a commented line containing 'testing' within [] - pattern = re.compile("^#\\[.*testing.*\\]$") - - # This is used to track if the previous line is a match, so we end up uncommenting the line after the block. - matched = False - - # Read in the lines from the original file - with open("/etc/pacman.conf", "r") as pacman_conf: - lines = pacman_conf.readlines() - - # Open the file again in write mode, to replace the contents - with open("/etc/pacman.conf", "w") as pacman_conf: - for line in lines: - if pattern.match(line) and (enable_multilib_testing or 'multilib' not in line): - # If this is the [] block containing 'testing', uncomment it and set the matched tracking boolean. - pacman_conf.write(line.lstrip('#')) - matched = True - elif matched: - # The previous line was a match for [.*testing.*]. - # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist' - pacman_conf.write(line.lstrip('#')) - matched = False # Reset the state of matched to False. - else: - pacman_conf.write(line) - - def _pacstrap(self, packages: Union[str, List[str]]) -> bool: - if isinstance(packages, str): - packages = [packages] - - for plugin in plugins.values(): - if hasattr(plugin, 'on_pacstrap'): - if (result := plugin.on_pacstrap(packages)): - packages = result - - info(f'Installing packages: {packages}') - - # TODO: We technically only need to run the -Syy once. - try: - run_pacman('-Syy', default_cmd='/usr/bin/pacman') - except SysCallError as err: - error(f'Could not sync a new package database: {err}') - - if storage['arguments'].get('silent', False) is False: - if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self._pacstrap(packages) - - raise RequirementError(f'Could not sync mirrors: {err}') - - try: - SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) - return True - except SysCallError as err: - error(f'Could not strap in packages: {err}') - - if storage['arguments'].get('silent', False) is False: - if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self._pacstrap(packages) - - raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") - def set_mirrors(self, mirror_config: MirrorConfiguration): for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): @@ -402,7 +304,8 @@ class Installer: add_custom_mirrors(mirror_config.custom_mirrors) def genfstab(self, flags :str = '-pU'): - info(f"Updating {self.target}/etc/fstab") + fstab_path = self.target / "etc" / "fstab" + info(f"Updating {fstab_path}") try: gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode() @@ -412,10 +315,10 @@ class Installer: if not gen_fstab: raise RequirementError(f'Genrating fstab returned empty value') - with open(f"{self.target}/etc/fstab", 'a') as fp: + with open(fstab_path, 'a') as fp: fp.write(gen_fstab) - if not os.path.isfile(f'{self.target}/etc/fstab'): + if not fstab_path.is_file(): raise RequirementError(f'Could not create fstab file') for plugin in plugins.values(): @@ -423,7 +326,7 @@ class Installer: if plugin.on_genfstab(self) is True: break - with open(f"{self.target}/etc/fstab", 'a') as fp: + with open(fstab_path, 'a') as fp: for entry in self._fstab_entries: fp.write(f'{entry}\n') @@ -432,9 +335,7 @@ class Installer: if part_mod.fs_type != disk.FilesystemType.Btrfs: continue - fstab_file = Path(f'{self.target}/etc/fstab') - - with fstab_file.open('r') as fp: + with fstab_path.open('r') as fp: fstab = fp.readlines() # Replace the {installation}/etc/fstab with entries @@ -456,7 +357,7 @@ class Installer: fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}') break - with fstab_file.open('w') as fp: + with fstab_path.open('w') as fp: fp.writelines(fstab) def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: @@ -486,8 +387,7 @@ class Installer: with open(f'{self.target}/etc/locale.gen', 'a') as fh: fh.write(f'{lang}.{encoding}{modifier} {encoding}\n') - with open(f'{self.target}/etc/locale.conf', 'w') as fh: - fh.write(f'LANG={lang}.{encoding}{modifier}\n') + (self.target / "etc" / "locale.conf").write_text(f'LANG={lang}.{encoding}{modifier}\n') try: SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen') @@ -561,16 +461,13 @@ class Installer: for plugin in plugins.values(): if hasattr(plugin, 'on_configure_nic'): - new_conf = plugin.on_configure_nic( + conf = plugin.on_configure_nic( network_config.iface, network_config.dhcp, network_config.ip, network_config.gateway, network_config.dns - ) - - if new_conf: - conf = new_conf + ) or conf with open(f"{self.target}/etc/systemd/network/10-{network_config.iface}.network", "a") as netconf: netconf.write(str(conf)) @@ -597,7 +494,7 @@ class Installer: # Otherwise, we can go ahead and add the required package # and enable it's service: else: - self._pacstrap('iwd') + self.pacman.strap('iwd') self.enable_service('iwd') for psk in psk_files: @@ -683,7 +580,7 @@ class Installer: if part in self._disk_encryption.partitions: if self._disk_encryption.hsm_device: # Required bby mkinitcpio to add support for fido2-device options - self._pacstrap('libfido2') + self.pacman.strap('libfido2') if 'sd-encrypt' not in self._hooks: self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') @@ -709,24 +606,27 @@ class Installer: # Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set. # This action takes place on the host system as pacstrap copies over package repository lists. + pacman_conf = pacman.Config(self.target) if multilib: info("The multilib flag is set. This system will be installed with the multilib repository enabled.") - self.enable_multilib_repository() + pacman_conf.enable(pacman.Repo.Multilib) else: info("The multilib flag is not set. This system will be installed without multilib repositories enabled.") if testing: info("The testing flag is set. This system will be installed with testing repositories enabled.") - self.enable_testing_repositories(multilib) + pacman_conf.enable(pacman.Repo.Testing) + if multilib: + pacman_conf.enable(pacman.Repo.MultilibTesting) else: info("The testing flag is not set. This system will be installed without testing repositories enabled.") - self._pacstrap(self.base_packages) + pacman_conf.apply() + + self.pacman.strap(self.base_packages) self.helper_flags['base-strapped'] = True - # This handles making sure that the repositories we enabled persist on the installed system - if multilib or testing: - shutil.copy2("/etc/pacman.conf", f"{self.target}/etc/pacman.conf") + pacman_conf.persist() # Periodic TRIM may improve the performance and longevity of SSDs whilst # having no adverse effect on other devices. Most distributions enable @@ -761,7 +661,7 @@ class Installer: def setup_swap(self, kind :str = 'zram'): if kind == 'zram': info(f"Setting up swap on zram") - self._pacstrap('zram-generator') + self.pacman.strap('zram-generator') # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' @@ -788,7 +688,7 @@ class Installer: return None def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): - self._pacstrap('efibootmgr') + self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): raise HardwareIncompatibilityError @@ -897,7 +797,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self._pacstrap('grub') # no need? + self.pacman.strap('grub') # no need? _file = "/etc/default/grub" @@ -916,7 +816,7 @@ class Installer: info(f"GRUB boot partition: {boot_partition.dev_path}") if SysInfo.has_uefi(): - self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) @@ -955,7 +855,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self._pacstrap('efibootmgr') + self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): raise HardwareIncompatibilityError @@ -1030,9 +930,6 @@ class Installer: if plugin.on_add_bootloader(self): return True - if type(self.target) == str: - self.target = Path(self.target) - boot_partition = self._get_boot_partition() root_partition = self._get_root_partition() @@ -1053,7 +950,7 @@ class Installer: self._add_efistub_bootloader(boot_partition, root_partition) def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: - return self._pacstrap(packages) + return self.pacman.strap(packages) def _enable_users(self, service: str, users: List[User]): for user in users: @@ -1214,7 +1111,7 @@ class Installer: return True - def _service_started(self, service_name: str) -> str | None: + def _service_started(self, service_name: str) -> Optional[str]: if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'): service_name += '.service' # Just to be safe -- cgit v1.2.3-54-g00ecf