Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/installer.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/installer.py')
-rw-r--r--archinstall/lib/installer.py1881
1 files changed, 1129 insertions, 752 deletions
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index f1c7b3db..8292a3be 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,323 +1,427 @@
-import time
-import logging
+import glob
import os
import re
-import shutil
import shlex
-import pathlib
+import shutil
import subprocess
-import glob
-from types import ModuleType
-from typing import Union, Dict, Any, List, Optional, Iterator, Mapping, TYPE_CHECKING
-from .disk import get_partitions_in_use, Partition
-from .general import SysCommand, generate_password
-from .hardware import has_uefi, is_vm, cpu_vendor
-from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout
-from .disk.helpers import findmnt
-from .mirrors import use_mirrors
-from .models.disk_encryption import DiskEncryption
-from .plugins import plugins
-from .storage import storage
-from .output import log
-from .profiles import Profile
-from .disk.partition import get_mount_fs_type
+import time
+from pathlib import Path
+from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable
+
+from . import disk
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
+from .general import SysCommand
+from .hardware import SysInfo
+from .locale import LocaleConfiguration
+from .locale import verify_keyboard_layout, verify_x11_keyboard_layout
+from .luks import Luks2
+from .mirrors import MirrorConfiguration
+from .models.bootloader import Bootloader
+from .models.network_configuration import Nic
from .models.users import User
-from .models.subvolume import Subvolume
-from .hsm import Fido2
+from .output import log, error, info, warn, debug
+from . import pacman
+from .pacman import Pacman
+from .plugins import plugins
+from .storage import storage
if TYPE_CHECKING:
_: Any
-
# Any package that the Installer() is responsible for (optional and the default ones)
__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
# Additional packages that are installed if the user is running the Live ISO with accessibility tools enabled
__accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
-from .pacman import run_pacman
-from .models.network_configuration import NetworkConfiguration
-
-
-class InstallationFile:
- def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
- self.installation = installation
- self.filename = filename
- self.owner = owner
- self.mode = mode
- self.fh = None
-
- def __enter__(self) -> 'InstallationFile':
- self.fh = open(self.filename, self.mode)
- return self
-
- def __exit__(self, *args :str) -> None:
- self.fh.close()
- self.installation.chown(self.owner, self.filename)
-
- def write(self, data: Union[str, bytes]) -> int:
- return self.fh.write(data)
-
- def read(self, *args) -> Union[str, bytes]:
- return self.fh.read(*args)
-
-# def poll(self, *args) -> bool:
-# return self.fh.poll(*args)
-
def accessibility_tools_in_use() -> bool:
return os.system('systemctl is-active --quiet espeakup.service') == 0
class Installer:
- """
- `Installer()` is the wrapper for most basic installation steps.
- It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
-
- :param partition: Requires a partition as the first argument, this is
- so that the installer can mount to `mountpoint` and strap packages there.
- :type partition: class:`archinstall.Partition`
-
- :param boot_partition: There's two reasons for needing a boot partition argument,
- The first being so that `mkinitcpio` can place the `vmlinuz` kernel at the right place
- during the `pacstrap` or `linux` and the base packages for a minimal installation.
- The second being when :py:func:`~archinstall.Installer.add_bootloader` is called,
- A `boot_partition` must be known to the installer before this is called.
- :type boot_partition: class:`archinstall.Partition`
-
- :param profile: A profile to install, this is optional and can be called later manually.
- This just simplifies the process by not having to call :py:func:`~archinstall.Installer.install_profile` later on.
- :type profile: str, optional
-
- :param hostname: The given /etc/hostname for the machine.
- :type hostname: str, optional
-
- """
-
- def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None):
- if base_packages is None:
- base_packages = __packages__[:3]
- if kernels is None:
- self.kernels = ['linux']
- else:
- self.kernels = kernels
- self.target = target
+ def __init__(
+ self,
+ target: Path,
+ disk_config: disk.DiskLayoutConfiguration,
+ disk_encryption: Optional[disk.DiskEncryption] = None,
+ base_packages: List[str] = [],
+ kernels: Optional[List[str]] = None
+ ):
+ """
+ `Installer()` is the wrapper for most basic installation steps.
+ It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
+ """
+ self._base_packages = base_packages or __packages__[:3]
+ self.kernels = kernels or ['linux']
+ self._disk_config = disk_config
+
+ self._disk_encryption = disk_encryption or disk.DiskEncryption(disk.EncryptionType.NoEncryption)
+ self.target: Path = target
+
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
+ self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None}
- self.helper_flags = {
- 'base': False,
- 'bootloader': False
- }
-
- self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
for kernel in self.kernels:
- self.base_packages.append(kernel)
+ self._base_packages.append(kernel)
# If using accessibility tools in the live environment, append those to the packages list
if accessibility_tools_in_use():
- self.base_packages.extend(__accessibility_packages__)
+ self._base_packages.extend(__accessibility_packages__)
- self.post_base_install = []
+ self.post_base_install: List[Callable] = []
# TODO: Figure out which one of these two we'll use.. But currently we're mixing them..
storage['session'] = self
storage['installation_session'] = self
- self.MODULES = []
- self.BINARIES = []
- self.FILES = []
+ self._modules: List[str] = []
+ self._binaries: List[str] = []
+ self._files: List[str] = []
+
# systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt
# if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override.
- self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"]
- self.KERNEL_PARAMS = []
- self.FSTAB_ENTRIES = []
+ self._hooks: List[str] = [
+ "base", "systemd", "autodetect", "microcode", "keyboard",
+ "sd-vconsole", "modconf", "block", "filesystems", "fsck"
+ ]
+ self._kernel_params: List[str] = []
+ self._fstab_entries: List[str] = []
self._zram_enabled = False
+ self._disable_fstrim = False
- self._disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')
-
- def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
- """
- installer.log() wraps output.log() mainly to set a default log-level for this install session.
- Any manual override can be done per log() call.
- """
- log(*args, level=level, **kwargs)
+ self.pacman = Pacman(self.target, storage['arguments'].get('silent', False))
- def __enter__(self, *args :str, **kwargs :str) -> 'Installer':
+ def __enter__(self) -> 'Installer':
return self
- def __exit__(self, *args :str, **kwargs :str) -> None:
- # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
-
- if len(args) >= 2 and args[1]:
- self.log(args[1], level=logging.ERROR, fg='red')
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is not None:
+ error(exc_val)
self.sync_log_to_install_medium()
# We avoid printing /mnt/<log path> because that might confuse people if they note it down
# and then reboot, and a identical log file will be found in the ISO medium anyway.
- print(_("[!] A log file has been created here: {}").format(os.path.join(storage['LOG_PATH'], storage['LOG_FILE'])))
+ print(_("[!] A log file has been created here: {}").format(
+ os.path.join(storage['LOG_PATH'], storage['LOG_FILE'])))
print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues"))
- raise args[1]
+ raise exc_val
if not (missing_steps := self.post_install_check()):
- self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO)
+ log('Installation completed without any errors. You may now reboot.', fg='green')
self.sync_log_to_install_medium()
-
return True
else:
- self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
+ warn('Some required steps were not successfully installed/configured before leaving the installer:')
+
for step in missing_steps:
- self.log(f' - {step}', fg='red', level=logging.WARNING)
+ warn(f' - {step}')
- self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING)
- self.log("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
+ warn(f"Detailed error logs can be found at: {storage['LOG_PATH']}")
+ warn("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues")
self.sync_log_to_install_medium()
return False
- @property
- def partitions(self) -> List[Partition]:
- return get_partitions_in_use(self.target).values()
+ def remove_mod(self, mod: str):
+ if mod in self._modules:
+ self._modules.remove(mod)
- def sync_log_to_install_medium(self) -> bool:
- # Copy over the install log (if there is one) to the install medium if
- # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
- if self.helper_flags.get('base-strapped', False) is True:
- if filename := storage.get('LOG_FILE', None):
- absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
+ def append_mod(self, mod: str):
+ if mod not in self._modules:
+ self._modules.append(mod)
- if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
- os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
+ def _verify_service_stop(self):
+ """
+ Certain services might be running that affects the system during installation.
+ One such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
+ We need to wait for it before we continue since we opted in to use a custom mirror/region.
+ """
- shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
+ if not storage['arguments'].get('skip_ntp', False):
+ info(_('Waiting for time sync (timedatectl show) to complete.'))
- return True
+ _started_wait = time.time()
+ _notified = False
+ while True:
+ if not _notified and time.time() - _started_wait > 5:
+ _notified = True
+ warn(
+ _("Time synchronization not completing, while you wait - check the docs for workarounds: https://archinstall.readthedocs.io/"))
- def _create_keyfile(self,luks_handle , partition :dict, password :str):
- """ roiutine to create keyfiles, so it can be moved elsewhere
- """
- if self._disk_encryption and self._disk_encryption.generate_encryption_file(partition):
- if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
- cryptkey_dir.mkdir(parents=True)
- # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key
- # if we name the device to "xyzloop".
- if partition.get('mountpoint',None):
- encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key"
- else:
- encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key"
- with open(f"{self.target}{encryption_key_path}", "w") as keyfile:
- keyfile.write(generate_password(length=512))
+ time_val = SysCommand('timedatectl show --property=NTPSynchronized --value').decode()
+ if time_val and time_val.strip() == 'yes':
+ break
+ time.sleep(1)
+ else:
+ info(
+ _('Skipping waiting for automatic time sync (this can cause issues if time is out of sync during installation)'))
+
+ info('Waiting for automatic mirror selection (reflector) to complete.')
+ while self._service_state('reflector') not in ('dead', 'failed', 'exited'):
+ time.sleep(1)
+
+ # info('Waiting for pacman-init.service to complete.')
+ # while self._service_state('pacman-init') not in ('dead', 'failed', 'exited'):
+ # time.sleep(1)
- os.chmod(f"{self.target}{encryption_key_path}", 0o400)
+ info(_('Waiting for Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.'))
+ # Wait for the timer to kick in
+ while self._service_started('archlinux-keyring-wkd-sync.timer') is None:
+ time.sleep(1)
- luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password)
- luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"])
+ # Wait for the service to enter a finished state
+ while self._service_state('archlinux-keyring-wkd-sync.service') not in ('dead', 'failed', 'exited'):
+ time.sleep(1)
- def _has_root(self, partition :dict) -> bool:
+ def _verify_boot_part(self):
"""
- Determine if an encrypted partition contains root in it
+ Check that mounted /boot device has at minimum size for installation
+ The reason this check is here is to catch pre-mounted device configuration and potentially
+ configured one that has not gone through any previous checks (e.g. --silence mode)
+
+ NOTE: this function should be run AFTER running the mount_ordered_layout function
"""
- if partition.get("mountpoint") is None:
- if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})):
- for mountpoint in [sub_list[subvolume].get("mountpoint") if isinstance(subvolume, dict) else subvolume.mountpoint for subvolume in sub_list]:
- if mountpoint == '/':
- return True
- return False
- else:
- return False
- elif partition.get("mountpoint") == '/':
- return True
- else:
- return False
+ boot_mount = self.target / 'boot'
+ lsblk_info = disk.get_lsblk_by_mountpoint(boot_mount)
+
+ if len(lsblk_info) > 0:
+ if lsblk_info[0].size < disk.Size(200, disk.Unit.MiB, disk.SectorSize.default()):
+ raise DiskError(
+ f'The boot partition mounted at {boot_mount} is not large enough to install a boot loader. '
+ f'Please resize it to at least 200MiB and re-run the installation.'
+ )
- def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
- from .luks import luks2
- from .disk.btrfs import setup_subvolumes, mount_subvolume
-
- # set the partitions as a list not part of a tree (which we don't need anymore (i think)
- list_part = []
- list_luks_handles = []
- for blockdevice in layouts:
- list_part.extend(layouts[blockdevice]['partitions'])
-
- # TODO: Implement a proper mount-queue system that does not depend on return values.
- mount_queue = {}
-
- # we manage the encrypted partititons
- if self._disk_encryption:
- for partition in self._disk_encryption.all_partitions:
- # open the luks device and all associate stuff
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
-
- # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
- with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device:
- if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition):
- list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password])
- # this way all the requesrs will be to the dm_crypt device and not to the physical partition
- partition['device_instance'] = unlocked_device
-
- if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False:
- if self._disk_encryption.hsm_device:
- Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password)
-
- btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])]
-
- for partition in btrfs_subvolumes:
- device_instance = partition['device_instance']
- mount_options = partition.get('filesystem', {}).get('mount_options', [])
- self.mount(device_instance, "/", options=','.join(mount_options))
- setup_subvolumes(installation=self, partition_dict=partition)
- device_instance.unmount()
-
- # We then handle any special cases, such as btrfs
- for partition in btrfs_subvolumes:
- subvolumes: List[Subvolume] = partition['btrfs']['subvolumes']
- for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint):
- # We cache the mount call for later
- mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume(
- installation=self,
- device=device,
- subvolume=sub_vol
- )
+ def sanity_check(self):
+ # self._verify_boot_part()
+ self._verify_service_stop()
+
+ def mount_ordered_layout(self):
+ debug('Mounting ordered layout')
+
+ luks_handlers: Dict[Any, Luks2] = {}
+
+ match self._disk_encryption.encryption_type:
+ case disk.EncryptionType.NoEncryption:
+ self._mount_lvm_layout()
+ case disk.EncryptionType.Luks:
+ luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions)
+ case disk.EncryptionType.LvmOnLuks:
+ luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions)
+ self._import_lvm()
+ self._mount_lvm_layout(luks_handlers)
+ case disk.EncryptionType.LuksOnLvm:
+ self._import_lvm()
+ luks_handlers = self._prepare_luks_lvm(self._disk_encryption.lvm_volumes)
+ self._mount_lvm_layout(luks_handlers)
+
+ # mount all regular partitions
+ self._mount_partition_layout(luks_handlers)
+
+ def _mount_partition_layout(self, luks_handlers: Dict[Any, Luks2]):
+ debug('Mounting partition layout')
+
+ # do not mount any PVs part of the LVM configuration
+ pvs = []
+ if self._disk_config.lvm_config:
+ pvs = self._disk_config.lvm_config.get_all_pvs()
+
+ for mod in self._disk_config.device_modifications:
+ not_pv_part_mods = list(filter(lambda x: x not in pvs, mod.partitions))
+
+ # partitions have to mounted in the right order on btrfs the mountpoint will
+ # be empty as the actual subvolumes are getting mounted instead so we'll use
+ # '/' just for sorting
+ sorted_part_mods = sorted(not_pv_part_mods, key=lambda x: x.mountpoint or Path('/'))
+
+ for part_mod in sorted_part_mods:
+ if luks_handler := luks_handlers.get(part_mod):
+ self._mount_luks_partition(part_mod, luks_handler)
+ else:
+ self._mount_partition(part_mod)
- # We mount ordinary partitions, and we sort them by the mountpoint
- for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']):
- mountpoint = partition['mountpoint']
- log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
+ def _mount_lvm_layout(self, luks_handlers: Dict[Any, Luks2] = {}):
+ lvm_config = self._disk_config.lvm_config
- if partition.get('filesystem',{}).get('mount_options',[]):
- mount_options = ','.join(partition['filesystem']['mount_options'])
- mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}", options=mount_options: instance.mount(target, options=options)
- else:
- mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target)
+ if not lvm_config:
+ debug('No lvm config defined to be mounted')
+ return
- log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.DEBUG, fg="white")
+ debug('Mounting LVM layout')
- # We mount everything by sorting on the mountpoint itself.
- for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]):
- frozen_func()
+ for vg in lvm_config.vol_groups:
+ sorted_vol = sorted(vg.volumes, key=lambda x: x.mountpoint or Path('/'))
- time.sleep(1)
+ for vol in sorted_vol:
+ if luks_handler := luks_handlers.get(vol):
+ self._mount_luks_volume(vol, luks_handler)
+ else:
+ self._mount_lvm_vol(vol)
+
+ def _prepare_luks_partitions(
+ self,
+ partitions: List[disk.PartitionModification]
+ ) -> Dict[disk.PartitionModification, Luks2]:
+ return {
+ part_mod: disk.device_handler.unlock_luks2_dev(
+ part_mod.dev_path,
+ part_mod.mapper_name,
+ self._disk_encryption.encryption_password
+ )
+ for part_mod in partitions
+ if part_mod.mapper_name and part_mod.dev_path
+ }
- try:
- findmnt(pathlib.Path(f"{self.target}{mountpoint}"), traverse=False)
- except DiskError:
- raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
+ def _import_lvm(self):
+ lvm_config = self._disk_config.lvm_config
+
+ if not lvm_config:
+ debug('No lvm config defined to be imported')
+ return
+
+ for vg in lvm_config.vol_groups:
+ disk.device_handler.lvm_import_vg(vg)
+
+ for vol in vg.volumes:
+ disk.device_handler.lvm_vol_change(vol, True)
+
+ def _prepare_luks_lvm(
+ self,
+ lvm_volumes: List[disk.LvmVolume]
+ ) -> Dict[disk.LvmVolume, Luks2]:
+ return {
+ vol: disk.device_handler.unlock_luks2_dev(
+ vol.dev_path,
+ vol.mapper_name,
+ self._disk_encryption.encryption_password
+ )
+ for vol in lvm_volumes
+ if vol.mapper_name and vol.dev_path
+ }
+
+ def _mount_partition(self, part_mod: disk.PartitionModification):
+ # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
+ if part_mod.mountpoint and part_mod.dev_path:
+ target = self.target / part_mod.relative_mountpoint
+ disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options)
+
+ if part_mod.fs_type == disk.FilesystemType.Btrfs and part_mod.dev_path:
+ self._mount_btrfs_subvol(
+ part_mod.dev_path,
+ part_mod.btrfs_subvols,
+ part_mod.mount_options
+ )
+
+ def _mount_lvm_vol(self, volume: disk.LvmVolume):
+ if volume.fs_type != disk.FilesystemType.Btrfs:
+ if volume.mountpoint and volume.dev_path:
+ target = self.target / volume.relative_mountpoint
+ disk.device_handler.mount(volume.dev_path, target, options=volume.mount_options)
+
+ if volume.fs_type == disk.FilesystemType.Btrfs and volume.dev_path:
+ self._mount_btrfs_subvol(volume.dev_path, volume.btrfs_subvols, volume.mount_options)
+
+ def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2):
+ if part_mod.fs_type != disk.FilesystemType.Btrfs:
+ if part_mod.mountpoint and luks_handler.mapper_dev:
+ target = self.target / part_mod.relative_mountpoint
+ disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
+
+ if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
+ self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options)
+
+ def _mount_luks_volume(self, volume: disk.LvmVolume, luks_handler: Luks2):
+ if volume.fs_type != disk.FilesystemType.Btrfs:
+ if volume.mountpoint and luks_handler.mapper_dev:
+ target = self.target / volume.relative_mountpoint
+ disk.device_handler.mount(luks_handler.mapper_dev, target, options=volume.mount_options)
+
+ if volume.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
+ self._mount_btrfs_subvol(luks_handler.mapper_dev, volume.btrfs_subvols, volume.mount_options)
+
+ def _mount_btrfs_subvol(
+ self,
+ dev_path: Path,
+ subvolumes: List[disk.SubvolumeModification],
+ mount_options: List[str] = []
+ ):
+ for subvol in subvolumes:
+ mountpoint = self.target / subvol.relative_mountpoint
+ mount_options = mount_options + [f'subvol={subvol.name}']
+ disk.device_handler.mount(dev_path, mountpoint, options=mount_options)
+
+ def generate_key_files(self):
+ match self._disk_encryption.encryption_type:
+ case disk.EncryptionType.Luks:
+ self._generate_key_files_partitions()
+ case disk.EncryptionType.LuksOnLvm:
+ self._generate_key_file_lvm_volumes()
+ case disk.EncryptionType.LvmOnLuks:
+ # currently LvmOnLuks only supports a single
+ # partitioning layout (boot + partition)
+ # so we won't need any keyfile generation atm
+ pass
+
+ def _generate_key_files_partitions(self):
+ for part_mod in self._disk_encryption.partitions:
+ gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod)
+
+ luks_handler = Luks2(
+ part_mod.safe_dev_path,
+ mapper_name=part_mod.mapper_name,
+ password=self._disk_encryption.encryption_password
+ )
+
+ if gen_enc_file and not part_mod.is_root():
+ debug(f'Creating key-file: {part_mod.dev_path}')
+ luks_handler.create_keyfile(self.target)
+
+ if part_mod.is_root() and not gen_enc_file:
+ if self._disk_encryption.hsm_device:
+ disk.Fido2.fido2_enroll(
+ self._disk_encryption.hsm_device,
+ part_mod.safe_dev_path,
+ self._disk_encryption.encryption_password
+ )
+
+ def _generate_key_file_lvm_volumes(self):
+ for vol in self._disk_encryption.lvm_volumes:
+ gen_enc_file = self._disk_encryption.should_generate_encryption_file(vol)
+
+ luks_handler = Luks2(
+ vol.safe_dev_path,
+ mapper_name=vol.mapper_name,
+ password=self._disk_encryption.encryption_password
+ )
+
+ if gen_enc_file and not vol.is_root():
+ info(f'Creating key-file: {vol.dev_path}')
+ luks_handler.create_keyfile(self.target)
- # once everything is mounted, we generate the key files in the correct place
- for handle in list_luks_handles:
- ppath = handle[1]['device_instance'].path
- log(f"creating key-file for {ppath}",level=logging.INFO)
- self._create_keyfile(handle[0],handle[1],handle[2])
+ if vol.is_root() and not gen_enc_file:
+ if self._disk_encryption.hsm_device:
+ disk.Fido2.fido2_enroll(
+ self._disk_encryption.hsm_device,
+ vol.safe_dev_path,
+ self._disk_encryption.encryption_password
+ )
+
+ def sync_log_to_install_medium(self) -> bool:
+ # Copy over the install log (if there is one) to the install medium if
+ # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
+ if self.helper_flags.get('base-strapped', False) is True:
+ if filename := storage.get('LOG_FILE', None):
+ absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
+
+ if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
+ os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
- def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True, options='') -> None:
- if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
- os.makedirs(f'{self.target}{mountpoint}')
+ shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
- partition.mount(f'{self.target}{mountpoint}', options=options)
+ return True
def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'):
if file[:1] != '/':
@@ -329,176 +433,137 @@ class Installer:
SysCommand(f'chmod 0600 {self.target}{file}')
SysCommand(f'mkswap {self.target}{file}')
- self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0')
+ self._fstab_entries.append(f'{file} none swap defaults 0 0')
if enable_resume:
- resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip()
- resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip()
+ resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode()
+ resume_offset = SysCommand(
+ f'/usr/bin/filefrag -v {self.target}{file}'
+ ).decode().split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip()
- self.HOOKS.append('resume')
- self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}')
- self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}')
+ self._hooks.append('resume')
+ self._kernel_params.append(f'resume=UUID={resume_uuid}')
+ self._kernel_params.append(f'resume_offset={resume_offset}')
- def post_install_check(self, *args :str, **kwargs :str) -> List[str]:
+ def post_install_check(self, *args: str, **kwargs: str) -> List[str]:
return [step for step, flag in self.helper_flags.items() if flag is False]
- def enable_multilib_repository(self):
- # Set up a regular expression pattern of a commented line containing 'multilib' within []
- pattern = re.compile(r"^#\s*\[multilib\]$")
-
- # This is used to track if the previous line is a match, so we end up uncommenting the line after the block.
- matched = False
-
- # Read in the lines from the original file
- with open("/etc/pacman.conf", "r") as pacman_conf:
- lines = pacman_conf.readlines()
-
- # Open the file again in write mode, to replace the contents
- with open("/etc/pacman.conf", "w") as pacman_conf:
- for line in lines:
- if pattern.match(line):
- # If this is the [] block containing 'multilib', uncomment it and set the matched tracking boolean.
- pacman_conf.write(line.lstrip('#'))
- matched = True
- elif matched:
- # The previous line was a match for [.*multilib.*].
- # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist'
- pacman_conf.write(line.lstrip('#'))
- matched = False # Reset the state of matched to False.
- else:
- pacman_conf.write(line)
-
- def enable_testing_repositories(self, enable_multilib_testing=False):
- # Set up a regular expression pattern of a commented line containing 'testing' within []
- pattern = re.compile("^#\\[.*testing.*\\]$")
-
- # This is used to track if the previous line is a match, so we end up uncommenting the line after the block.
- matched = False
-
- # Read in the lines from the original file
- with open("/etc/pacman.conf", "r") as pacman_conf:
- lines = pacman_conf.readlines()
-
- # Open the file again in write mode, to replace the contents
- with open("/etc/pacman.conf", "w") as pacman_conf:
- for line in lines:
- if pattern.match(line) and (enable_multilib_testing or 'multilib' not in line):
- # If this is the [] block containing 'testing', uncomment it and set the matched tracking boolean.
- pacman_conf.write(line.lstrip('#'))
- matched = True
- elif matched:
- # The previous line was a match for [.*testing.*].
- # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist'
- pacman_conf.write(line.lstrip('#'))
- matched = False # Reset the state of matched to False.
- else:
- pacman_conf.write(line)
-
- def pacstrap(self, *packages :str, **kwargs :str) -> bool:
- if type(packages[0]) in (list, tuple):
- packages = packages[0]
-
- for plugin in plugins.values():
- if hasattr(plugin, 'on_pacstrap'):
- if (result := plugin.on_pacstrap(packages)):
- packages = result
+ def set_mirrors(self, mirror_config: MirrorConfiguration, on_target: bool = False):
+ """
+ Set the mirror configuration for the installation.
- self.log(f'Installing packages: {packages}', level=logging.INFO)
+ :param mirror_config: The mirror configuration to use.
+ :type mirror_config: MirrorConfiguration
- # TODO: We technically only need to run the -Syy once.
- try:
- run_pacman('-Syy', default_cmd='/usr/bin/pacman')
- except SysCallError as error:
- self.log(f'Could not sync a new package database: {error}', level=logging.ERROR, fg="red")
+ :on_target: Whether to set the mirrors on the target system or the live system.
+ :param on_target: bool
+ """
+ debug('Setting mirrors')
- if storage['arguments'].get('silent', False) is False:
- if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
- return self.pacstrap(*packages, **kwargs)
+ for plugin in plugins.values():
+ if hasattr(plugin, 'on_mirrors'):
+ if result := plugin.on_mirrors(mirror_config):
+ mirror_config = result
- raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red")
+ if on_target:
+ local_pacman_conf = Path(f'{self.target}/etc/pacman.conf')
+ local_mirrorlist_conf = Path(f'{self.target}/etc/pacman.d/mirrorlist')
+ else:
+ local_pacman_conf = Path('/etc/pacman.conf')
+ local_mirrorlist_conf = Path('/etc/pacman.d/mirrorlist')
- try:
- SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True)
- return True
- except SysCallError as error:
- self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red")
+ mirrorlist_config = mirror_config.mirrorlist_config()
+ pacman_config = mirror_config.pacman_config()
- if storage['arguments'].get('silent', False) is False:
- if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
- return self.pacstrap(*packages, **kwargs)
+ if pacman_config:
+ debug(f'Pacman config: {pacman_config}')
- raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
+ with local_pacman_conf.open('a') as fp:
+ fp.write(pacman_config)
- def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None:
- for plugin in plugins.values():
- if hasattr(plugin, 'on_mirrors'):
- if result := plugin.on_mirrors(mirrors):
- mirrors = result
+ if mirrorlist_config:
+ debug(f'Mirrorlist: {mirrorlist_config}')
- return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
+ with local_mirrorlist_conf.open('a') as fp:
+ fp.write(mirrorlist_config)
- def genfstab(self, flags :str = '-pU') -> bool:
- self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
+ def genfstab(self, flags: str = '-pU'):
+ fstab_path = self.target / "etc" / "fstab"
+ info(f"Updating {fstab_path}")
try:
- fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}')
- except SysCallError as error:
- raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}')
+ gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').output()
+ except SysCallError as err:
+ raise RequirementError(
+ f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {err}')
- with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
- fstab_fh.write(fstab.decode())
+ with open(fstab_path, 'ab') as fp:
+ fp.write(gen_fstab)
- if not os.path.isfile(f'{self.target}/etc/fstab'):
- raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}')
+ if not fstab_path.is_file():
+ raise RequirementError(f'Could not create fstab file')
for plugin in plugins.values():
if hasattr(plugin, 'on_genfstab'):
if plugin.on_genfstab(self) is True:
break
- with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
- for entry in self.FSTAB_ENTRIES:
- fstab_fh.write(f'{entry}\n')
+ with open(fstab_path, 'a') as fp:
+ for entry in self._fstab_entries:
+ fp.write(f'{entry}\n')
- return True
-
- def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
+ def set_hostname(self, hostname: str):
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
- def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool:
- if not len(locale):
- return True
-
+ def set_locale(self, locale_config: LocaleConfiguration) -> bool:
modifier = ''
+ lang = locale_config.sys_lang
+ encoding = locale_config.sys_enc
# This is a temporary patch to fix #1200
- if '.' in locale:
- locale, potential_encoding = locale.split('.', 1)
+ if '.' in locale_config.sys_lang:
+ lang, potential_encoding = locale_config.sys_lang.split('.', 1)
# Override encoding if encoding is set to the default parameter
# and the "found" encoding differs.
- if encoding == 'UTF-8' and encoding != potential_encoding:
+ if locale_config.sys_enc == 'UTF-8' and locale_config.sys_enc != potential_encoding:
encoding = potential_encoding
# Make sure we extract the modifier, that way we can put it in if needed.
- if '@' in locale:
- locale, modifier = locale.split('@', 1)
+ if '@' in locale_config.sys_lang:
+ lang, modifier = locale_config.sys_lang.split('@', 1)
modifier = f"@{modifier}"
# - End patch
- with open(f'{self.target}/etc/locale.gen', 'a') as fh:
- fh.write(f'{locale}.{encoding}{modifier} {encoding}\n')
- with open(f'{self.target}/etc/locale.conf', 'w') as fh:
- fh.write(f'LANG={locale}.{encoding}{modifier}\n')
+ locale_gen = self.target / 'etc/locale.gen'
+ locale_gen_lines = locale_gen.read_text().splitlines(True)
+
+ # A locale entry in /etc/locale.gen may or may not contain the encoding
+ # in the first column of the entry; check for both cases.
+ entry_re = re.compile(rf'#{lang}(\.{encoding})?{modifier} {encoding}')
+
+ for index, line in enumerate(locale_gen_lines):
+ if entry_re.match(line):
+ uncommented_line = line.removeprefix('#')
+ locale_gen_lines[index] = uncommented_line
+ locale_gen.write_text(''.join(locale_gen_lines))
+ lang_value = uncommented_line.split()[0]
+ break
+ else:
+ error(f"Invalid locale: language '{locale_config.sys_lang}', encoding '{locale_config.sys_enc}'")
+ return False
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen')
- return True
- except SysCallError:
+ except SysCallError as e:
+ error(f'Failed to run locale-gen on target: {e}')
return False
- def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool:
+ (self.target / 'etc/locale.conf').write_text(f'LANG={lang_value}\n')
+ return True
+
+ def set_timezone(self, zone: str) -> bool:
if not zone:
return True
if not len(zone):
@@ -509,62 +574,49 @@ class Installer:
if result := plugin.on_timezone(zone):
zone = result
- if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists():
- (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
+ if (Path("/usr") / "share" / "zoneinfo" / zone).exists():
+ (Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
else:
- self.log(
- f"Time zone {zone} does not exist, continuing with system default.",
- level=logging.WARNING,
- fg='red'
- )
+ warn(f'Time zone {zone} does not exist, continuing with system default')
return False
- def activate_ntp(self) -> None:
- log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO)
- self.activate_time_syncronization()
-
- def activate_time_syncronization(self) -> None:
- self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO)
+ def activate_time_synchronization(self) -> None:
+ info('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers')
self.enable_service('systemd-timesyncd')
- with open(f"{self.target}/etc/systemd/timesyncd.conf", "w") as fh:
- fh.write("[Time]\n")
- fh.write("NTP=0.arch.pool.ntp.org 1.arch.pool.ntp.org 2.arch.pool.ntp.org 3.arch.pool.ntp.org\n")
- fh.write("FallbackNTP=0.pool.ntp.org 1.pool.ntp.org 0.fr.pool.ntp.org\n")
-
- from .systemd import Boot
- with Boot(self) as session:
- session.SysCommand(["timedatectl", "set-ntp", 'true'])
-
def enable_espeakup(self) -> None:
- self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO)
+ info('Enabling espeakup.service for speech synthesis (accessibility)')
self.enable_service('espeakup')
def enable_periodic_trim(self) -> None:
- self.log("Enabling periodic TRIM")
+ info("Enabling periodic TRIM")
# fstrim is owned by util-linux, a dependency of both base and systemd.
self.enable_service("fstrim.timer")
- def enable_service(self, *services :str) -> None:
+ def enable_service(self, services: Union[str, List[str]]) -> None:
+ if isinstance(services, str):
+ services = [services]
+
for service in services:
- self.log(f'Enabling service {service}', level=logging.INFO)
+ info(f'Enabling service {service}')
+
try:
self.arch_chroot(f'systemctl enable {service}')
- except SysCallError as error:
- raise ServiceException(f"Unable to start service {service}: {error}")
+ except SysCallError as err:
+ raise ServiceException(f"Unable to start service {service}: {err}")
for plugin in plugins.values():
if hasattr(plugin, 'on_service'):
plugin.on_service(service)
- def run_command(self, cmd :str, *args :str, **kwargs :str) -> None:
+ def run_command(self, cmd: str, *args: str, **kwargs: str) -> SysCommand:
return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
- def arch_chroot(self, cmd :str, run_as :Optional[str] = None):
+ def arch_chroot(self, cmd: str, run_as: Optional[str] = None) -> SysCommand:
if run_as:
cmd = f"su - {run_as} -c {shlex.quote(cmd)}"
@@ -573,38 +625,23 @@ class Installer:
def drop_to_shell(self) -> None:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
- def configure_nic(self, network_config: NetworkConfiguration) -> None:
- from .systemd import Networkd
-
- if network_config.dhcp:
- conf = Networkd(Match={"Name": network_config.iface}, Network={"DHCP": "yes"})
- else:
- network = {"Address": network_config.ip}
- if network_config.gateway:
- network["Gateway"] = network_config.gateway
- if network_config.dns:
- dns = network_config.dns
- network["DNS"] = dns if isinstance(dns, list) else [dns]
-
- conf = Networkd(Match={"Name": network_config.iface}, Network=network)
+ def configure_nic(self, nic: Nic):
+ conf = nic.as_systemd_config()
for plugin in plugins.values():
if hasattr(plugin, 'on_configure_nic'):
- new_conf = plugin.on_configure_nic(
- network_config.iface,
- network_config.dhcp,
- network_config.ip,
- network_config.gateway,
- network_config.dns
- )
-
- if new_conf:
- conf = new_conf
-
- with open(f"{self.target}/etc/systemd/network/10-{network_config.iface}.network", "a") as netconf:
+ conf = plugin.on_configure_nic(
+ nic.iface,
+ nic.dhcp,
+ nic.ip,
+ nic.gateway,
+ nic.dns
+ ) or conf
+
+ with open(f"{self.target}/etc/systemd/network/10-{nic.iface}.network", "a") as netconf:
netconf.write(str(conf))
- def copy_iso_network_config(self, enable_services :bool = False) -> bool:
+ def copy_iso_network_config(self, enable_services: bool = False) -> bool:
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
if psk_files := glob.glob('/var/lib/iwd/*.psk'):
@@ -614,19 +651,19 @@ class Installer:
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
- self.base_packages.append('iwd')
+ self._base_packages.append('iwd')
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
# base is not installed yet.
- def post_install_enable_iwd_service(*args :str, **kwargs :str):
+ def post_install_enable_iwd_service(*args: str, **kwargs: str):
self.enable_service('iwd')
self.post_base_install.append(post_install_enable_iwd_service)
# Otherwise, we can go ahead and add the required package
# and enable it's service:
else:
- self.pacstrap('iwd')
+ self.pacman.strap('iwd')
self.enable_service('iwd')
for psk in psk_files:
@@ -644,179 +681,208 @@ class Installer:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
- def post_install_enable_networkd_resolved(*args :str, **kwargs :str):
- self.enable_service('systemd-networkd', 'systemd-resolved')
+ def post_install_enable_networkd_resolved(*args: str, **kwargs: str):
+ self.enable_service(['systemd-networkd', 'systemd-resolved'])
self.post_base_install.append(post_install_enable_networkd_resolved)
# Otherwise, we can go ahead and enable the services
else:
- self.enable_service('systemd-networkd', 'systemd-resolved')
+ self.enable_service(['systemd-networkd', 'systemd-resolved'])
return True
- def detect_encryption(self, partition :Partition) -> bool:
- from .disk.mapperdev import MapperDev
- from .disk.dmcryptdev import DMCryptDev
- from .disk.helpers import get_filesystem_type
-
- if type(partition) is MapperDev:
- # Returns MapperDev.partition
- return partition.partition
- elif type(partition) is DMCryptDev:
- return partition.MapperDev.partition
- elif get_filesystem_type(partition.path) == 'crypto_LUKS':
- return partition
-
- return False
-
- def mkinitcpio(self, *flags :str) -> bool:
+ def mkinitcpio(self, flags: List[str]) -> bool:
for plugin in plugins.values():
if hasattr(plugin, 'on_mkinitcpio'):
# Allow plugins to override the usage of mkinitcpio altogether.
if plugin.on_mkinitcpio(self):
return True
- # mkinitcpio will error out if there's no vconsole.
- if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False:
- with vconsole.open('w') as fh:
- fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n")
-
with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
- mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n")
- mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
- mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
+ mkinit.write(f"MODULES=({' '.join(self._modules)})\n")
+ mkinit.write(f"BINARIES=({' '.join(self._binaries)})\n")
+ mkinit.write(f"FILES=({' '.join(self._files)})\n")
- if self._disk_encryption and not self._disk_encryption.hsm_device:
+ if not self._disk_encryption.hsm_device:
# For now, if we don't use HSM we revert to the old
# way of setting up encryption hooks for mkinitcpio.
# This is purely for stability reasons, we're going away from this.
# * systemd -> udev
# * sd-vconsole -> keymap
- self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS]
+ self._hooks = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self._hooks]
- mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
+ mkinit.write(f"HOOKS=({' '.join(self._hooks)})\n")
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
+ SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}', peek_output=True)
return True
- except SysCallError:
+ except SysCallError as error:
+ if error.worker:
+ log(error.worker._trace_log.decode())
return False
- def minimal_installation(
- self, testing: bool = False, multilib: bool = False,
- hostname: str = 'archinstall', locales: List[str] = ['en_US.UTF-8 UTF-8']) -> bool:
- # Add necessary packages if encrypting the drive
- # (encrypted partitions default to btrfs for now, so we need btrfs-progs)
- # TODO: Perhaps this should be living in the function which dictates
- # the partitioning. Leaving here for now.
-
- for partition in self.partitions:
- if partition.filesystem == 'btrfs':
- # if partition.encrypted:
- if 'btrfs-progs' not in self.base_packages:
- self.base_packages.append('btrfs-progs')
- if partition.filesystem == 'xfs':
- if 'xfs' not in self.base_packages:
- self.base_packages.append('xfsprogs')
- if partition.filesystem == 'f2fs':
- if 'f2fs' not in self.base_packages:
- self.base_packages.append('f2fs-tools')
-
- # Configure mkinitcpio to handle some specific use cases.
- if partition.filesystem == 'btrfs':
- if 'btrfs' not in self.MODULES:
- self.MODULES.append('btrfs')
- if '/usr/bin/btrfs' not in self.BINARIES:
- self.BINARIES.append('/usr/bin/btrfs')
- # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
- if partition.filesystem == 'ntfs3' and partition.mountpoint == self.target:
- if 'fsck' in self.HOOKS:
- self.HOOKS.remove('fsck')
-
- if self.detect_encryption(partition):
- if self._disk_encryption and self._disk_encryption.hsm_device:
- # Required bby mkinitcpio to add support for fido2-device options
- self.pacstrap('libfido2')
-
- if 'sd-encrypt' not in self.HOOKS:
- self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt')
- else:
- if 'encrypt' not in self.HOOKS:
- self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
-
- if not has_uefi():
- self.base_packages.append('grub')
-
- if not is_vm():
- vendor = cpu_vendor()
- if vendor == "AuthenticAMD":
- self.base_packages.append("amd-ucode")
- if (ucode := pathlib.Path(f"{self.target}/boot/amd-ucode.img")).exists():
- ucode.unlink()
- elif vendor == "GenuineIntel":
- self.base_packages.append("intel-ucode")
- if (ucode := pathlib.Path(f"{self.target}/boot/intel-ucode.img")).exists():
- ucode.unlink()
+ def _get_microcode(self) -> Optional[Path]:
+ if not SysInfo.is_vm():
+ if vendor := SysInfo.cpu_vendor():
+ return vendor.get_ucode()
+ return None
+
+ def _handle_partition_installation(self):
+ pvs = []
+ if self._disk_config.lvm_config:
+ pvs = self._disk_config.lvm_config.get_all_pvs()
+
+ for mod in self._disk_config.device_modifications:
+ for part in mod.partitions:
+ if part in pvs or part.fs_type is None:
+ continue
+
+ if (pkg := part.fs_type.installation_pkg) is not None:
+ self._base_packages.append(pkg)
+ if (module := part.fs_type.installation_module) is not None:
+ self._modules.append(module)
+ if (binary := part.fs_type.installation_binary) is not None:
+ self._binaries.append(binary)
+
+ # https://github.com/archlinux/archinstall/issues/1837
+ if part.fs_type.fs_type_mount == 'btrfs':
+ self._disable_fstrim = True
+
+ # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
+ if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
+ if 'fsck' in self._hooks:
+ self._hooks.remove('fsck')
+
+ if part in self._disk_encryption.partitions:
+ if self._disk_encryption.hsm_device:
+ # Required by mkinitcpio to add support for fido2-device options
+ self.pacman.strap('libfido2')
+
+ if 'sd-encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
+ else:
+ if 'encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
+
+ def _handle_lvm_installation(self):
+ if not self._disk_config.lvm_config:
+ return
+
+ self.add_additional_packages('lvm2')
+ self._hooks.insert(self._hooks.index('filesystems') - 1, 'lvm2')
+
+ for vg in self._disk_config.lvm_config.vol_groups:
+ for vol in vg.volumes:
+ if vol.fs_type is not None:
+ if (pkg := vol.fs_type.installation_pkg) is not None:
+ self._base_packages.append(pkg)
+ if (module := vol.fs_type.installation_module) is not None:
+ self._modules.append(module)
+ if (binary := vol.fs_type.installation_binary) is not None:
+ self._binaries.append(binary)
+
+ if vol.fs_type.fs_type_mount == 'btrfs':
+ self._disable_fstrim = True
+
+ # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
+ if vol.fs_type.fs_type_mount == 'ntfs3' and vol.mountpoint == self.target:
+ if 'fsck' in self._hooks:
+ self._hooks.remove('fsck')
+
+ if self._disk_encryption.encryption_type in [disk.EncryptionType.LvmOnLuks, disk.EncryptionType.LuksOnLvm]:
+ if self._disk_encryption.hsm_device:
+ # Required by mkinitcpio to add support for fido2-device options
+ self.pacman.strap('libfido2')
+
+ if 'sd-encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('lvm2') - 1, 'sd-encrypt')
else:
- self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode.", level=logging.DEBUG)
+ if 'encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('lvm2') - 1, 'encrypt')
+
+ def minimal_installation(
+ self,
+ testing: bool = False,
+ multilib: bool = False,
+ mkinitcpio: bool = True,
+ hostname: str = 'archinstall',
+ locale_config: LocaleConfiguration = LocaleConfiguration.default()
+ ):
+ if self._disk_config.lvm_config:
+ self._handle_lvm_installation()
+ else:
+ self._handle_partition_installation()
+
+ if not SysInfo.has_uefi():
+ self._base_packages.append('grub')
+
+ if ucode := self._get_microcode():
+ (self.target / 'boot' / ucode).unlink(missing_ok=True)
+ self._base_packages.append(ucode.stem)
+ else:
+ debug('Archinstall will not install any ucode.')
# Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set.
# This action takes place on the host system as pacstrap copies over package repository lists.
+ pacman_conf = pacman.Config(self.target)
if multilib:
- self.log("The multilib flag is set. This system will be installed with the multilib repository enabled.")
- self.enable_multilib_repository()
+ info("The multilib flag is set. This system will be installed with the multilib repository enabled.")
+ pacman_conf.enable(pacman.Repo.Multilib)
else:
- self.log("The multilib flag is not set. This system will be installed without multilib repositories enabled.")
+ info("The multilib flag is not set. This system will be installed without multilib repositories enabled.")
if testing:
- self.log("The testing flag is set. This system will be installed with testing repositories enabled.")
- self.enable_testing_repositories(multilib)
+ info("The testing flag is set. This system will be installed with testing repositories enabled.")
+ pacman_conf.enable(pacman.Repo.Testing)
else:
- self.log("The testing flag is not set. This system will be installed without testing repositories enabled.")
+ info("The testing flag is not set. This system will be installed without testing repositories enabled.")
+
+ pacman_conf.apply()
- self.pacstrap(self.base_packages)
+ self.pacman.strap(self._base_packages)
self.helper_flags['base-strapped'] = True
- # This handles making sure that the repositories we enabled persist on the installed system
- if multilib or testing:
- shutil.copy2("/etc/pacman.conf", f"{self.target}/etc/pacman.conf")
+ pacman_conf.persist()
# Periodic TRIM may improve the performance and longevity of SSDs whilst
# having no adverse effect on other devices. Most distributions enable
# periodic TRIM by default.
#
# https://github.com/archlinux/archinstall/issues/880
- self.enable_periodic_trim()
+ # https://github.com/archlinux/archinstall/issues/1837
+ # https://github.com/archlinux/archinstall/issues/1841
+ if not self._disable_fstrim:
+ self.enable_periodic_trim()
# TODO: Support locale and timezone
# os.remove(f'{self.target}/etc/localtime')
# sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
# sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime')
self.set_hostname(hostname)
- self.set_locale(*locales[0].split())
+ self.set_locale(locale_config)
+ self.set_keyboard_language(locale_config.kb_layout)
# TODO: Use python functions for this
SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
- self.mkinitcpio('-P')
+ if mkinitcpio and not self.mkinitcpio(['-P']):
+ error('Error generating initramfs (continuing anyway)')
self.helper_flags['base'] = True
# Run registered post-install hooks
for function in self.post_base_install:
- self.log(f"Running post-installation hook: {function}", level=logging.INFO)
+ info(f"Running post-installation hook: {function}")
function(self)
for plugin in plugins.values():
if hasattr(plugin, 'on_install'):
plugin.on_install(self)
- return True
-
- def setup_swap(self, kind :str = 'zram') -> bool:
+ def setup_swap(self, kind: str = 'zram'):
if kind == 'zram':
- self.log(f"Setting up swap on zram")
- self.pacstrap('zram-generator')
+ info(f"Setting up swap on zram")
+ self.pacman.strap('zram-generator')
# We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813
# zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example'
@@ -827,224 +893,532 @@ class Installer:
self.enable_service('systemd-zram-setup@zram0.service')
self._zram_enabled = True
-
- return True
else:
raise ValueError(f"Archinstall currently only supports setting up swap on zram")
- def add_systemd_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
- self.pacstrap('efibootmgr')
+ def _get_efi_partition(self) -> Optional[disk.PartitionModification]:
+ for layout in self._disk_config.device_modifications:
+ if partition := layout.get_efi_partition():
+ return partition
+ return None
+
+ def _get_boot_partition(self) -> Optional[disk.PartitionModification]:
+ for layout in self._disk_config.device_modifications:
+ if boot := layout.get_boot_partition():
+ return boot
+ return None
+
+ def _get_root(self) -> Optional[disk.PartitionModification | disk.LvmVolume]:
+ if self._disk_config.lvm_config:
+ return self._disk_config.lvm_config.get_root_volume()
+ else:
+ for mod in self._disk_config.device_modifications:
+ if root := mod.get_root_partition():
+ return root
+ return None
+
+ def _get_luks_uuid_from_mapper_dev(self, mapper_dev_path: Path) -> str:
+ lsblk_info = disk.get_lsblk_info(mapper_dev_path, reverse=True, full_dev_path=True)
+
+ if not lsblk_info.children or not lsblk_info.children[0].uuid:
+ raise ValueError('Unable to determine UUID of luks superblock')
+
+ return lsblk_info.children[0].uuid
+
+ def _get_kernel_params_partition(
+ self,
+ root_partition: disk.PartitionModification,
+ id_root: bool = True,
+ partuuid: bool = True
+ ) -> List[str]:
+ kernel_parameters = []
+
+ if root_partition in self._disk_encryption.partitions:
+ # TODO: We need to detect if the encrypted device is a whole disk encryption,
+ # or simply a partition encryption. Right now we assume it's a partition (and we always have)
+
+ if self._disk_encryption and self._disk_encryption.hsm_device:
+ debug(f'Root partition is an encrypted device, identifying by UUID: {root_partition.uuid}')
+ # Note: UUID must be used, not PARTUUID for sd-encrypt to work
+ kernel_parameters.append(f'rd.luks.name={root_partition.uuid}=root')
+ # Note: tpm2-device and fido2-device don't play along very well:
+ # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645
+ kernel_parameters.append('rd.luks.options=fido2-device=auto,password-echo=no')
+ elif partuuid:
+ debug(f'Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}')
+ kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:root')
+ else:
+ debug(f'Root partition is an encrypted device, identifying by UUID: {root_partition.uuid}')
+ kernel_parameters.append(f'cryptdevice=UUID={root_partition.uuid}:root')
+
+ if id_root:
+ kernel_parameters.append('root=/dev/mapper/root')
+ elif id_root:
+ if partuuid:
+ debug(f'Identifying root partition by PARTUUID: {root_partition.partuuid}')
+ kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid}')
+ else:
+ debug(f'Identifying root partition by UUID: {root_partition.uuid}')
+ kernel_parameters.append(f'root=UUID={root_partition.uuid}')
+
+ return kernel_parameters
+
+ def _get_kernel_params_lvm(
+ self,
+ lvm: disk.LvmVolume
+ ) -> List[str]:
+ kernel_parameters = []
+
+ match self._disk_encryption.encryption_type:
+ case disk.EncryptionType.LvmOnLuks:
+ if not lvm.vg_name:
+ raise ValueError(f'Unable to determine VG name for {lvm.name}')
+
+ pv_seg_info = disk.device_handler.lvm_pvseg_info(lvm.vg_name, lvm.name)
- if not has_uefi():
+ if not pv_seg_info:
+ raise ValueError(f'Unable to determine PV segment info for {lvm.vg_name}/{lvm.name}')
+
+ uuid = self._get_luks_uuid_from_mapper_dev(pv_seg_info.pv_name)
+
+ if self._disk_encryption.hsm_device:
+ debug(f'LvmOnLuks, encrypted root partition, HSM, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'rd.luks.name={uuid}=cryptlvm root={lvm.safe_dev_path}')
+ else:
+ debug(f'LvmOnLuks, encrypted root partition, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'cryptdevice=UUID={uuid}:cryptlvm root={lvm.safe_dev_path}')
+ case disk.EncryptionType.LuksOnLvm:
+ uuid = self._get_luks_uuid_from_mapper_dev(lvm.mapper_path)
+
+ if self._disk_encryption.hsm_device:
+ debug(f'LuksOnLvm, encrypted root partition, HSM, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'rd.luks.name={uuid}=root root=/dev/mapper/root')
+ else:
+ debug(f'LuksOnLvm, encrypted root partition, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'cryptdevice=UUID={uuid}:root root=/dev/mapper/root')
+ case disk.EncryptionType.NoEncryption:
+ debug(f'Identifying root lvm by mapper device: {lvm.dev_path}')
+ kernel_parameters.append(f'root={lvm.safe_dev_path}')
+
+ return kernel_parameters
+
+ def _get_kernel_params(
+ self,
+ root: disk.PartitionModification | disk.LvmVolume,
+ id_root: bool = True,
+ partuuid: bool = True
+ ) -> List[str]:
+ kernel_parameters = []
+
+ if isinstance(root, disk.LvmVolume):
+ kernel_parameters = self._get_kernel_params_lvm(root)
+ else:
+ kernel_parameters = self._get_kernel_params_partition(root, id_root, partuuid)
+
+ # Zswap should be disabled when using zram.
+ # https://github.com/archlinux/archinstall/issues/881
+ if self._zram_enabled:
+ kernel_parameters.append('zswap.enabled=0')
+
+ if id_root:
+ for sub_vol in root.btrfs_subvols:
+ if sub_vol.is_root():
+ kernel_parameters.append(f'rootflags=subvol={sub_vol.name}')
+ break
+
+ kernel_parameters.append('rw')
+
+ kernel_parameters.append(f'rootfstype={root.safe_fs_type.fs_type_mount}')
+ kernel_parameters.extend(self._kernel_params)
+
+ debug(f'kernel parameters: {" ".join(kernel_parameters)}')
+
+ return kernel_parameters
+
+ def _add_systemd_bootloader(
+ self,
+ boot_partition: disk.PartitionModification,
+ root: disk.PartitionModification | disk.LvmVolume,
+ efi_partition: Optional[disk.PartitionModification],
+ uki_enabled: bool = False
+ ):
+ debug('Installing systemd bootloader')
+
+ self.pacman.strap('efibootmgr')
+
+ if not SysInfo.has_uefi():
raise HardwareIncompatibilityError
+
# TODO: Ideally we would want to check if another config
# points towards the same disk and/or partition.
# And in which case we should do some clean up.
+ bootctl_options = []
+
+ if efi_partition and boot_partition != efi_partition:
+ bootctl_options.append(f'--esp-path={efi_partition.mountpoint}')
+ bootctl_options.append(f'--boot-path={boot_partition.mountpoint}')
# Install the boot loader
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install')
+ SysCommand(f"/usr/bin/arch-chroot {self.target} bootctl {' '.join(bootctl_options)} install")
except SysCallError:
# Fallback, try creating the boot loader without touching the EFI variables
- SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
+ SysCommand(f"/usr/bin/arch-chroot {self.target} bootctl --no-variables {' '.join(bootctl_options)} install")
- # Ensure that the /boot/loader directory exists before we try to create files in it
- if not os.path.exists(f'{self.target}/boot/loader'):
- os.makedirs(f'{self.target}/boot/loader')
+ # Ensure that the $BOOT/loader/ directory exists before we try to create files in it.
+ #
+ # As mentioned in https://github.com/archlinux/archinstall/pull/1859 - we store the
+ # loader entries in $BOOT/loader/ rather than $ESP/loader/
+ # The current reasoning being that $BOOT works in both use cases as well
+ # as being tied to the current installation. This may change.
+ loader_dir = self.target / 'boot/loader'
+ loader_dir.mkdir(parents=True, exist_ok=True)
+
+ default_kernel = self.kernels[0]
+ if uki_enabled:
+ default_entry = f'arch-{default_kernel}.efi'
+ else:
+ entry_name = self.init_time + '_{kernel}{variant}.conf'
+ default_entry = entry_name.format(kernel=default_kernel, variant='')
+
+ default = f'default {default_entry}'
# Modify or create a loader.conf
- if os.path.isfile(f'{self.target}/boot/loader/loader.conf'):
- with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader:
- loader_data = loader.read().split('\n')
- else:
+ loader_conf = loader_dir / 'loader.conf'
+
+ try:
+ loader_data = loader_conf.read_text().splitlines()
+ except FileNotFoundError:
loader_data = [
- f"default {self.init_time}",
- "timeout 15"
+ default,
+ 'timeout 15'
]
-
- with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
- for line in loader_data:
- if line[:8] == 'default ':
- loader.write(f'default {self.init_time}_{self.kernels[0]}\n')
- elif line[:8] == '#timeout' and 'timeout 15' not in loader_data:
+ else:
+ for index, line in enumerate(loader_data):
+ if line.startswith('default'):
+ loader_data[index] = default
+ elif line.startswith('#timeout'):
# We add in the default timeout to support dual-boot
- loader.write(f"{line[1:]}\n")
- else:
- loader.write(f"{line}\n")
+ loader_data[index] = line.removeprefix('#')
+
+ loader_conf.write_text('\n'.join(loader_data) + '\n')
- # Ensure that the /boot/loader/entries directory exists before we try to create files in it
- if not os.path.exists(f'{self.target}/boot/loader/entries'):
- os.makedirs(f'{self.target}/boot/loader/entries')
+ if uki_enabled:
+ return
+
+ # Ensure that the $BOOT/loader/entries/ directory exists before we try to create files in it
+ entries_dir = loader_dir / 'entries'
+ entries_dir.mkdir(parents=True, exist_ok=True)
+
+ comments = (
+ '# Created by: archinstall',
+ f'# Created on: {self.init_time}'
+ )
+
+ options = 'options ' + ' '.join(self._get_kernel_params(root))
for kernel in self.kernels:
for variant in ("", "-fallback"):
# Setup the loader entry
- with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}{variant}.conf', 'w') as entry:
- entry.write('# Created by: archinstall\n')
- entry.write(f'# Created on: {self.init_time}\n')
- entry.write(f'title Arch Linux ({kernel}{variant})\n')
- entry.write(f"linux /vmlinuz-{kernel}\n")
- if not is_vm():
- vendor = cpu_vendor()
- if vendor == "AuthenticAMD":
- entry.write("initrd /amd-ucode.img\n")
- elif vendor == "GenuineIntel":
- entry.write("initrd /intel-ucode.img\n")
- else:
- self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG)
- entry.write(f"initrd /initramfs-{kernel}{variant}.img\n")
- # blkid doesn't trigger on loopback devices really well,
- # so we'll use the old manual method until we get that sorted out.
- root_fs_type = get_mount_fs_type(root_partition.filesystem)
-
- if root_fs_type is not None:
- options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n'
- else:
- options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n'
-
- for subvolume in root_partition.subvolumes:
- if subvolume.root is True and subvolume.name != '<FS_TREE>':
- options_entry = f"rootflags=subvol={subvolume.name} " + options_entry
-
- # Zswap should be disabled when using zram.
- #
- # https://github.com/archlinux/archinstall/issues/881
- if self._zram_enabled:
- options_entry = "zswap.enabled=0 " + options_entry
-
- if real_device := self.detect_encryption(root_partition):
- # TODO: We need to detect if the encrypted device is a whole disk encryption,
- # or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG)
-
- kernel_options = f"options"
-
- if self._disk_encryption.hsm_device:
- # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work
- kernel_options += f" rd.luks.name={real_device.uuid}=luksdev"
- # Note: tpm2-device and fido2-device don't play along very well:
- # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645
- kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no"
- else:
- kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev"
-
- entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}')
-
- if self._disk_encryption and self._disk_encryption.hsm_device:
- # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work
- kernel_options += f" rd.luks.name={real_device.uuid}=luksdev"
- # Note: tpm2-device and fido2-device don't play along very well:
- # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645
- kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no"
- else:
- log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
- entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}')
+ entry = [
+ *comments,
+ f'title Arch Linux ({kernel}{variant})',
+ f'linux /vmlinuz-{kernel}',
+ f'initrd /initramfs-{kernel}{variant}.img',
+ options,
+ ]
- self.helper_flags['bootloader'] = "systemd"
+ name = entry_name.format(kernel=kernel, variant=variant)
+ entry_conf = entries_dir / name
+ entry_conf.write_text('\n'.join(entry) + '\n')
- return True
+ self.helper_flags['bootloader'] = 'systemd'
- def add_grub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
- self.pacstrap('grub') # no need?
+ def _add_grub_bootloader(
+ self,
+ boot_partition: disk.PartitionModification,
+ root: disk.PartitionModification | disk.LvmVolume,
+ efi_partition: Optional[disk.PartitionModification]
+ ):
+ debug('Installing grub bootloader')
- root_fs_type = get_mount_fs_type(root_partition.filesystem)
+ self.pacman.strap('grub') # no need?
- if real_device := self.detect_encryption(root_partition):
- root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip()
- _file = "/etc/default/grub"
- add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'"
- enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'"
+ grub_default = self.target / 'etc/default/grub'
+ config = grub_default.read_text()
- log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO)
- SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
- SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}")
- else:
- _file = "/etc/default/grub"
- add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'"
- SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
+ kernel_parameters = ' '.join(self._get_kernel_params(root, False, False))
+ config = re.sub(r'(GRUB_CMDLINE_LINUX=")("\n)', rf'\1{kernel_parameters}\2', config, 1)
+
+ grub_default.write_text(config)
+
+ info(f"GRUB boot partition: {boot_partition.dev_path}")
+
+ boot_dir = Path('/boot')
+
+ command = [
+ '/usr/bin/arch-chroot',
+ str(self.target),
+ 'grub-install',
+ '--debug'
+ ]
+
+ if SysInfo.has_uefi():
+ if not efi_partition:
+ raise ValueError('Could not detect efi partition')
+
+ info(f"GRUB EFI partition: {efi_partition.dev_path}")
+
+ self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
+
+ boot_dir_arg = []
+ if boot_partition.mountpoint and boot_partition.mountpoint != boot_dir:
+ boot_dir_arg.append(f'--boot-directory={boot_partition.mountpoint}')
+ boot_dir = boot_partition.mountpoint
+
+ add_options = [
+ '--target=x86_64-efi',
+ f'--efi-directory={efi_partition.mountpoint}',
+ *boot_dir_arg,
+ '--bootloader-id=GRUB',
+ '--removable'
+ ]
+
+ command.extend(add_options)
- log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO)
- if has_uefi():
- self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
+ SysCommand(command, peek_output=True)
except SysCallError:
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
- except SysCallError as error:
- raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}")
+ SysCommand(command, peek_output=True)
+ except SysCallError as err:
+ raise DiskError(f"Could not install GRUB to {self.target}{efi_partition.mountpoint}: {err}")
else:
+ info(f"GRUB boot partition: {boot_partition.dev_path}")
+
+ parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path)
+
+ add_options = [
+ '--target=i386-pc',
+ '--recheck',
+ str(parent_dev_path)
+ ]
+
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True)
- except SysCallError as error:
- raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}")
+ SysCommand(command + add_options, peek_output=True)
+ except SysCallError as err:
+ raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {err}")
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg')
- except SysCallError as error:
- raise DiskError(f"Could not configure GRUB: {error}")
+ SysCommand(
+ f'/usr/bin/arch-chroot {self.target} '
+ f'grub-mkconfig -o {boot_dir}/grub/grub.cfg'
+ )
+ except SysCallError as err:
+ raise DiskError(f"Could not configure GRUB: {err}")
self.helper_flags['bootloader'] = "grub"
- return True
+ def _add_limine_bootloader(
+ self,
+ boot_partition: disk.PartitionModification,
+ efi_partition: Optional[disk.PartitionModification],
+ root: disk.PartitionModification | disk.LvmVolume
+ ):
+ debug('Installing limine bootloader')
+
+ self.pacman.strap('limine')
+
+ info(f"Limine boot partition: {boot_partition.dev_path}")
+
+ limine_path = self.target / 'usr' / 'share' / 'limine'
+ hook_command = None
+
+ if SysInfo.has_uefi():
+ if not efi_partition:
+ raise ValueError('Could not detect efi partition')
+ elif not efi_partition.mountpoint:
+ raise ValueError('EFI partition is not mounted')
+
+ info(f"Limine EFI partition: {efi_partition.dev_path}")
+
+ try:
+ efi_dir_path = self.target / efi_partition.mountpoint.relative_to('/') / 'EFI' / 'BOOT'
+ efi_dir_path.mkdir(parents=True, exist_ok=True)
+
+ for file in ('BOOTIA32.EFI', 'BOOTX64.EFI'):
+ shutil.copy(limine_path / file, efi_dir_path)
+ except Exception as err:
+ raise DiskError(f'Failed to install Limine in {self.target}{efi_partition.mountpoint}: {err}')
+
+ hook_command = f'/usr/bin/cp /usr/share/limine/BOOTIA32.EFI {efi_partition.mountpoint}/EFI/BOOT/' \
+ f' && /usr/bin/cp /usr/share/limine/BOOTX64.EFI {efi_partition.mountpoint}/EFI/BOOT/'
+ else:
+ parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path)
+
+ if unique_path := disk.device_handler.get_unique_path_for_device(parent_dev_path):
+ parent_dev_path = unique_path
+
+ try:
+ # The `limine-bios.sys` file contains stage 3 code.
+ shutil.copy(limine_path / 'limine-bios.sys', self.target / 'boot')
+
+ # `limine bios-install` deploys the stage 1 and 2 to the disk.
+ SysCommand(f'/usr/bin/arch-chroot {self.target} limine bios-install {parent_dev_path}', peek_output=True)
+ except Exception as err:
+ raise DiskError(f'Failed to install Limine on {parent_dev_path}: {err}')
+
+ hook_command = f'/usr/bin/limine bios-install {parent_dev_path}' \
+ f' && /usr/bin/cp /usr/share/limine/limine-bios.sys /boot/'
+
+ hook_contents = f'''[Trigger]
+Operation = Install
+Operation = Upgrade
+Type = Package
+Target = limine
+
+[Action]
+Description = Deploying Limine after upgrade...
+When = PostTransaction
+Exec = /bin/sh -c "{hook_command}"
+'''
+
+ hooks_dir = self.target / 'etc' / 'pacman.d' / 'hooks'
+ hooks_dir.mkdir(parents=True, exist_ok=True)
+
+ hook_path = hooks_dir / '99-limine.hook'
+ hook_path.write_text(hook_contents)
+
+ kernel_params = ' '.join(self._get_kernel_params(root))
+ config_contents = 'TIMEOUT=5\n'
+
+ for kernel in self.kernels:
+ for variant in ('', '-fallback'):
+ entry = [
+ f'PROTOCOL=linux',
+ f'KERNEL_PATH=boot:///vmlinuz-{kernel}',
+ f'MODULE_PATH=boot:///initramfs-{kernel}{variant}.img',
+ f'CMDLINE={kernel_params}',
+ ]
+
+ config_contents += f'\n:Arch Linux ({kernel}{variant})\n'
+ config_contents += '\n'.join([f' {it}' for it in entry]) + '\n'
- def add_efistub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
- self.pacstrap('efibootmgr')
+ config_path = self.target / 'boot' / 'limine.cfg'
+ config_path.write_text(config_contents)
- if not has_uefi():
+ self.helper_flags['bootloader'] = "limine"
+
+ def _add_efistub_bootloader(
+ self,
+ boot_partition: disk.PartitionModification,
+ root: disk.PartitionModification | disk.LvmVolume,
+ uki_enabled: bool = False
+ ):
+ debug('Installing efistub bootloader')
+
+ self.pacman.strap('efibootmgr')
+
+ if not SysInfo.has_uefi():
raise HardwareIncompatibilityError
+
# TODO: Ideally we would want to check if another config
# points towards the same disk and/or partition.
# And in which case we should do some clean up.
- root_fs_type = get_mount_fs_type(root_partition.filesystem)
+ if not uki_enabled:
+ loader = '/vmlinuz-{kernel}'
+
+ entries = (
+ 'initrd=/initramfs-{kernel}.img',
+ *self._get_kernel_params(root)
+ )
+
+ cmdline = [' '.join(entries)]
+ else:
+ loader = '/EFI/Linux/arch-{kernel}.efi'
+ cmdline = []
+
+ parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path)
+
+ cmd_template = (
+ 'efibootmgr',
+ '--create',
+ '--disk', str(parent_dev_path),
+ '--part', str(boot_partition.partn),
+ '--label', 'Arch Linux ({kernel})',
+ '--loader', loader,
+ '--unicode', *cmdline,
+ '--verbose'
+ )
for kernel in self.kernels:
# Setup the firmware entry
+ cmd = [arg.format(kernel=kernel) for arg in cmd_template]
+ SysCommand(cmd)
+
+ self.helper_flags['bootloader'] = "efistub"
- label = f'Arch Linux ({kernel})'
- loader = f"/vmlinuz-{kernel}"
+ def _config_uki(
+ self,
+ root: disk.PartitionModification | disk.LvmVolume,
+ efi_partition: Optional[disk.PartitionModification]
+ ):
+ if not efi_partition or not efi_partition.mountpoint:
+ raise ValueError(f'Could not detect ESP at mountpoint {self.target}')
- kernel_parameters = []
+ # Set up kernel command line
+ with open(self.target / 'etc/kernel/cmdline', 'w') as cmdline:
+ kernel_parameters = self._get_kernel_params(root)
+ cmdline.write(' '.join(kernel_parameters) + '\n')
- if not is_vm():
- vendor = cpu_vendor()
- if vendor == "AuthenticAMD":
- kernel_parameters.append("initrd=\\amd-ucode.img")
- elif vendor == "GenuineIntel":
- kernel_parameters.append("initrd=\\intel-ucode.img")
- else:
- self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.", level=logging.DEBUG)
+ diff_mountpoint = None
- kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img")
+ if efi_partition.mountpoint != Path('/efi'):
+ diff_mountpoint = str(efi_partition.mountpoint)
- # blkid doesn't trigger on loopback devices really well,
- # so we'll use the old manual method until we get that sorted out.
- if real_device := self.detect_encryption(root_partition):
- # TODO: We need to detect if the encrypted device is a whole disk encryption,
- # or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.part_uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.part_uuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
- else:
- log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'root=PARTUUID={root_partition.part_uuid} rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
+ image_re = re.compile('(.+_image="/([^"]+).+\n)')
+ uki_re = re.compile('#((.+_uki=")/[^/]+(.+\n))')
- SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
+ # Modify .preset files
+ for kernel in self.kernels:
+ preset = self.target / 'etc/mkinitcpio.d' / (kernel + '.preset')
+ config = preset.read_text().splitlines(True)
+
+ for index, line in enumerate(config):
+ # Avoid storing redundant image file
+ if m := image_re.match(line):
+ image = self.target / m.group(2)
+ image.unlink(missing_ok=True)
+ config[index] = '#' + m.group(1)
+ elif m := uki_re.match(line):
+ if diff_mountpoint:
+ config[index] = m.group(2) + diff_mountpoint + m.group(3)
+ else:
+ config[index] = m.group(1)
+ elif line.startswith('#default_options='):
+ config[index] = line.removeprefix('#')
- self.helper_flags['bootloader'] = "efistub"
+ preset.write_text(''.join(config))
- return True
+ # Directory for the UKIs
+ uki_dir = self.target / efi_partition.relative_mountpoint / 'EFI/Linux'
+ uki_dir.mkdir(parents=True, exist_ok=True)
- def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool:
+ # Build the UKIs
+ if not self.mkinitcpio(['-P']):
+ error('Error generating initramfs (continuing anyway)')
+
+ def add_bootloader(self, bootloader: Bootloader, uki_enabled: bool = False):
"""
Adds a bootloader to the installation instance.
Archinstall supports one of three types:
* systemd-bootctl
* grub
+ * limine (beta)
* efistub (beta)
- :param bootloader: Can be one of the three strings
- 'systemd-bootctl', 'grub' or 'efistub' (beta)
+ :param bootloader: Type of bootloader to be added
"""
for plugin in plugins.values():
@@ -1054,61 +1428,41 @@ class Installer:
if plugin.on_add_bootloader(self):
return True
- if type(self.target) == str:
- self.target = pathlib.Path(self.target)
+ efi_partition = self._get_efi_partition()
+ boot_partition = self._get_boot_partition()
+ root = self._get_root()
- boot_partition = None
- root_partition = None
- for partition in self.partitions:
- if self.target / 'boot' in partition.mountpoints:
- boot_partition = partition
- elif self.target in partition.mountpoints:
- root_partition = partition
+ if boot_partition is None:
+ raise ValueError(f'Could not detect boot at mountpoint {self.target}')
- if boot_partition is None or root_partition is None:
- raise ValueError(f"Could not detect root ({root_partition}) or boot ({boot_partition}) in {self.target} based on: {self.partitions}")
+ if root is None:
+ raise ValueError(f'Could not detect root at mountpoint {self.target}')
- self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO)
+ info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}')
- if bootloader == 'systemd-bootctl':
- self.add_systemd_bootloader(boot_partition, root_partition)
- elif bootloader == "grub-install":
- self.add_grub_bootloader(boot_partition, root_partition)
- elif bootloader == 'efistub':
- self.add_efistub_bootloader(boot_partition, root_partition)
- else:
- raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}")
+ if uki_enabled:
+ self._config_uki(root, efi_partition)
- return True
-
- def add_additional_packages(self, *packages :str) -> bool:
- return self.pacstrap(*packages)
+ match bootloader:
+ case Bootloader.Systemd:
+ self._add_systemd_bootloader(boot_partition, root, efi_partition, uki_enabled)
+ case Bootloader.Grub:
+ self._add_grub_bootloader(boot_partition, root, efi_partition)
+ case Bootloader.Efistub:
+ self._add_efistub_bootloader(boot_partition, root, uki_enabled)
+ case Bootloader.Limine:
+ self._add_limine_bootloader(boot_partition, efi_partition, root)
- def install_profile(self, profile :str) -> ModuleType:
- """
- Installs a archinstall profile script (.py file).
- This profile can be either local, remote or part of the library.
+ def add_additional_packages(self, packages: Union[str, List[str]]) -> bool:
+ return self.pacman.strap(packages)
- :param profile: Can be a local path or a remote path (URL)
- :return: Returns the imported script as a module, this way
- you can access any remaining functions exposed by the profile.
- :rtype: module
- """
- storage['installation_session'] = self
-
- if type(profile) == str:
- profile = Profile(self, profile)
-
- self.log(f'Installing archinstall profile {profile}', level=logging.INFO)
- return profile.install()
-
- def enable_sudo(self, entity: str, group :bool = False):
- self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
+ def enable_sudo(self, entity: str, group: bool = False):
+ info(f'Enabling sudo permissions for {entity}')
sudoers_dir = f"{self.target}/etc/sudoers.d"
# Creates directory if not exists
- if not (sudoers_path := pathlib.Path(sudoers_dir)).exists():
+ if not (sudoers_path := Path(sudoers_dir)).exists():
sudoers_path.mkdir(parents=True)
# Guarantees sudoer confs directory recommended perms
os.chmod(sudoers_dir, 0o440)
@@ -1118,7 +1472,7 @@ class Installer:
# We count how many files are there already so we know which number to prefix the file with
num_of_rules_already = len(os.listdir(sudoers_dir))
- file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc
+ file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc
# Guarantees that entity str does not contain invalid characters for a linux file name:
# \ / : * ? " < > |
@@ -1130,7 +1484,7 @@ class Installer:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
# Guarantees sudoer conf file recommended perms
- os.chmod(pathlib.Path(rule_file_name), 0o440)
+ os.chmod(Path(rule_file_name), 0o440)
def create_users(self, users: Union[User, List[User]]):
if not isinstance(users, list):
@@ -1139,7 +1493,8 @@ class Installer:
for user in users:
self.user_create(user.username, user.password, user.groups, user.sudo)
- def user_create(self, user :str, password :Optional[str] = None, groups :Optional[List[str]] = None, sudo :bool = False) -> None:
+ def user_create(self, user: str, password: Optional[str] = None, groups: Optional[List[str]] = None,
+ sudo: bool = False) -> None:
if groups is None:
groups = []
@@ -1152,11 +1507,11 @@ class Installer:
handled_by_plugin = result
if not handled_by_plugin:
- self.log(f'Creating user {user}', level=logging.INFO)
+ info(f'Creating user {user}')
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')
- except SysCallError as error:
- raise SystemError(f"Could not create user inside installation: {error}")
+ except SysCallError as err:
+ raise SystemError(f"Could not create user inside installation: {err}")
for plugin in plugins.values():
if hasattr(plugin, 'on_user_created'):
@@ -1173,8 +1528,8 @@ class Installer:
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
- def user_set_pw(self, user :str, password :str) -> bool:
- self.log(f'Setting password for {user}', level=logging.INFO)
+ def user_set_pw(self, user: str, password: str) -> bool:
+ info(f'Setting password for {user}')
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
@@ -1190,8 +1545,8 @@ class Installer:
except SysCallError:
return False
- def user_set_shell(self, user :str, shell :str) -> bool:
- self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
+ def user_set_shell(self, user: str, shell: str) -> bool:
+ info(f'Setting shell for {user} to {shell}')
try:
SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")
@@ -1199,7 +1554,7 @@ class Installer:
except SysCallError:
return False
- def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
+ def chown(self, owner: str, path: str, options: List[str] = []) -> bool:
cleaned_path = path.replace('\'', '\\\'')
try:
SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'")
@@ -1207,53 +1562,75 @@ class Installer:
except SysCallError:
return False
- def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
- return InstallationFile(self, filename, owner)
-
def set_keyboard_language(self, language: str) -> bool:
- log(f"Setting keyboard language to {language}", level=logging.INFO)
+ info(f"Setting keyboard language to {language}")
+
if len(language.strip()):
if not verify_keyboard_layout(language):
- self.log(f"Invalid keyboard language specified: {language}", fg="red", level=logging.ERROR)
+ error(f"Invalid keyboard language specified: {language}")
return False
# In accordance with https://github.com/archlinux/archinstall/issues/107#issuecomment-841701968
# Setting an empty keymap first, allows the subsequent call to set layout for both console and x11.
- from .systemd import Boot
+ from .boot import Boot
with Boot(self) as session:
os.system('/usr/bin/systemd-run --machine=archinstall --pty localectl set-keymap ""')
try:
session.SysCommand(["localectl", "set-keymap", language])
- except SysCallError as error:
- raise ServiceException(f"Unable to set locale '{language}' for console: {error}")
+ except SysCallError as err:
+ raise ServiceException(f"Unable to set locale '{language}' for console: {err}")
- self.log(f"Keyboard language for this installation is now set to: {language}")
+ info(f"Keyboard language for this installation is now set to: {language}")
else:
- self.log('Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
+ info('Keyboard language was not changed from default (no language specified)')
return True
def set_x11_keyboard_language(self, language: str) -> bool:
- log(f"Setting x11 keyboard language to {language}", level=logging.INFO)
"""
A fallback function to set x11 layout specifically and separately from console layout.
This isn't strictly necessary since .set_keyboard_language() does this as well.
"""
+ info(f"Setting x11 keyboard language to {language}")
+
if len(language.strip()):
if not verify_x11_keyboard_layout(language):
- self.log(f"Invalid x11-keyboard language specified: {language}", fg="red", level=logging.ERROR)
+ error(f"Invalid x11-keyboard language specified: {language}")
return False
- from .systemd import Boot
+ from .boot import Boot
with Boot(self) as session:
session.SysCommand(["localectl", "set-x11-keymap", '""'])
try:
session.SysCommand(["localectl", "set-x11-keymap", language])
- except SysCallError as error:
- raise ServiceException(f"Unable to set locale '{language}' for X11: {error}")
+ except SysCallError as err:
+ raise ServiceException(f"Unable to set locale '{language}' for X11: {err}")
else:
- self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
+ info(f'X11-Keyboard language was not changed from default (no language specified)')
return True
+
+ def _service_started(self, service_name: str) -> Optional[str]:
+ if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'):
+ service_name += '.service' # Just to be safe
+
+ last_execution_time = SysCommand(
+ f"systemctl show --property=ActiveEnterTimestamp --no-pager {service_name}",
+ environment_vars={'SYSTEMD_COLORS': '0'}
+ ).decode().lstrip('ActiveEnterTimestamp=')
+
+ if not last_execution_time:
+ return None
+
+ return last_execution_time
+
+ def _service_state(self, service_name: str) -> str:
+ if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'):
+ service_name += '.service' # Just to be safe
+
+ return SysCommand(
+ f'systemctl show --no-pager -p SubState --value {service_name}',
+ environment_vars={'SYSTEMD_COLORS': '0'}
+ ).decode()