Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/installer.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/installer.py')
-rw-r--r--archinstall/lib/installer.py670
1 files changed, 340 insertions, 330 deletions
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index b4d253b3..ddbcc2f2 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,30 +1,29 @@
-import time
+import glob
import logging
import os
import re
-import shutil
import shlex
-import pathlib
+import shutil
import subprocess
-import glob
-from types import ModuleType
-from typing import Union, Dict, Any, List, Optional, Iterator, Mapping, TYPE_CHECKING
-from .disk import get_partitions_in_use, Partition
-from .general import SysCommand, generate_password
+import time
+from pathlib import Path
+from typing import Any, Iterator, List, Mapping, Optional, TYPE_CHECKING, Union, Dict
+
+from . import disk
+from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
+from .general import SysCommand
from .hardware import has_uefi, is_vm, cpu_vendor
from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout
-from .disk.helpers import findmnt
+from .luks import Luks2
from .mirrors import use_mirrors
-from .models.disk_encryption import DiskEncryption
+from .models.bootloader import Bootloader
+from .models.network_configuration import NetworkConfiguration
+from .models.users import User
+from .output import log
+from .pacman import run_pacman
from .plugins import plugins
+from .services import service_state
from .storage import storage
-from .output import log
-from .profiles import Profile
-from .disk.partition import get_mount_fs_type
-from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
-from .models.users import User
-from .models.subvolume import Subvolume
-from .hsm import Fido2
if TYPE_CHECKING:
_: Any
@@ -36,9 +35,6 @@ __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "l
# Additional packages that are installed if the user is running the Live ISO with accessibility tools enabled
__accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
-from .pacman import run_pacman
-from .models.network_configuration import NetworkConfiguration
-
class InstallationFile:
def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
@@ -92,26 +88,35 @@ class Installer:
:param hostname: The given /etc/hostname for the machine.
:type hostname: str, optional
-
"""
-
- def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None):
- if base_packages is None:
+ def __init__(
+ self,
+ target: Path,
+ disk_config: disk.DiskLayoutConfiguration,
+ disk_encryption: Optional[disk.DiskEncryption] = None,
+ base_packages: List[str] = [],
+ kernels: Optional[List[str]] = None
+ ):
+ if not base_packages:
base_packages = __packages__[:3]
+
if kernels is None:
self.kernels = ['linux']
else:
self.kernels = kernels
+
+ self._disk_config = disk_config
+ self._disk_encryption = disk_encryption
+
+ if self._disk_encryption is None:
+ self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption)
+
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
+ self.helper_flags = {'base': False, 'bootloader': False}
+ self.base_packages = base_packages
- self.helper_flags = {
- 'base': False,
- 'bootloader': False
- }
-
- self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
for kernel in self.kernels:
self.base_packages.append(kernel)
@@ -136,19 +141,10 @@ class Installer:
self._zram_enabled = False
- self._disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')
-
- def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
- """
- installer.log() wraps output.log() mainly to set a default log-level for this install session.
- Any manual override can be done per log() call.
- """
- log(*args, level=level, **kwargs)
-
- def __enter__(self, *args :str, **kwargs :str) -> 'Installer':
+ def __enter__(self, *args: str, **kwargs: str) -> 'Installer':
return self
- def __exit__(self, *args :str, **kwargs :str) -> None:
+ def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
@@ -165,7 +161,6 @@ class Installer:
if not (missing_steps := self.post_install_check()):
self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO)
self.sync_log_to_install_medium()
-
return True
else:
self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
@@ -178,146 +173,168 @@ class Installer:
self.sync_log_to_install_medium()
return False
- @property
- def partitions(self) -> List[Partition]:
- return get_partitions_in_use(self.target).values()
+ def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
+ """
+ installer.log() wraps output.log() mainly to set a default log-level for this install session.
+ Any manual override can be done per log() call.
+ """
+ log(*args, level=level, **kwargs)
- def sync_log_to_install_medium(self) -> bool:
- # Copy over the install log (if there is one) to the install medium if
- # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
- if self.helper_flags.get('base-strapped', False) is True:
- if filename := storage.get('LOG_FILE', None):
- absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
+ def _verify_service_stop(self):
+ """
+ Certain services might be running that affects the system during installation.
+ Currently, only one such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
+ We need to wait for it before we continue since we opted in to use a custom mirror/region.
+ """
+ log('Waiting for automatic mirror selection (reflector) to complete...', level=logging.INFO)
+ while service_state('reflector') not in ('dead', 'failed', 'exited'):
+ time.sleep(1)
- if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
- os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
+ log('Waiting pacman-init.service to complete.', level=logging.INFO)
+ while service_state('pacman-init') not in ('dead', 'failed', 'exited'):
+ time.sleep(1)
- shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
+ log('Waiting Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.', level=logging.INFO)
+ while service_state('archlinux-keyring-wkd-sync') not in ('dead', 'failed', 'exited'):
+ time.sleep(1)
- return True
+ def _verify_boot_part(self):
+ """
+ Check that mounted /boot device has at minimum size for installation
+ The reason this check is here is to catch pre-mounted device configuration and potentially
+ configured one that has not gone through any previous checks (e.g. --silence mode)
- def _create_keyfile(self,luks_handle , partition :dict, password :str):
- """ roiutine to create keyfiles, so it can be moved elsewhere
+ NOTE: this function should be run AFTER running the mount_ordered_layout function
"""
- if self._disk_encryption and self._disk_encryption.generate_encryption_file(partition):
- if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
- cryptkey_dir.mkdir(parents=True)
- # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key
- # if we name the device to "xyzloop".
- if partition.get('mountpoint',None):
- encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key"
- else:
- encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key"
- with open(f"{self.target}{encryption_key_path}", "w") as keyfile:
- keyfile.write(generate_password(length=512))
+ boot_mount = self.target / 'boot'
+ lsblk_info = disk.get_lsblk_by_mountpoint(boot_mount)
+
+ if len(lsblk_info) > 0:
+ if lsblk_info[0].size < disk.Size(200, disk.Unit.MiB):
+ raise DiskError(
+ f'The boot partition mounted at {boot_mount} is not large enough to install a boot loader. '
+ f'Please resize it to at least 200MiB and re-run the installation.'
+ )
- os.chmod(f"{self.target}{encryption_key_path}", 0o400)
+ def sanity_check(self):
+ self._verify_boot_part()
+ self._verify_service_stop()
- luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password)
- luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"])
+ def mount_ordered_layout(self):
+ log('Mounting partitions in order', level=logging.INFO)
- def _has_root(self, partition :dict) -> bool:
- """
- Determine if an encrypted partition contains root in it
- """
- if partition.get("mountpoint") is None:
- if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})):
- for mountpoint in [sub_list[subvolume].get("mountpoint") if isinstance(subvolume, dict) else subvolume.mountpoint for subvolume in sub_list]:
- if mountpoint == '/':
- return True
- return False
+ for mod in self._disk_config.device_modifications:
+ # partitions have to mounted in the right order on btrfs the mountpoint will
+ # be empty as the actual subvolumes are getting mounted instead so we'll use
+ # '/' just for sorting
+ sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint if x.mountpoint else Path('/'))
+
+ if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption:
+ enc_partitions = list(filter(lambda x: x in self._disk_encryption.partitions, sorted_part_mods))
else:
- return False
- elif partition.get("mountpoint") == '/':
- return True
- else:
- return False
+ enc_partitions = []
- def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
- from .luks import luks2
- from .disk.btrfs import setup_subvolumes, mount_subvolume
-
- # set the partitions as a list not part of a tree (which we don't need anymore (i think)
- list_part = []
- list_luks_handles = []
- for blockdevice in layouts:
- list_part.extend(layouts[blockdevice]['partitions'])
-
- # TODO: Implement a proper mount-queue system that does not depend on return values.
- mount_queue = {}
-
- # we manage the encrypted partititons
- if self._disk_encryption:
- for partition in self._disk_encryption.all_partitions:
- # open the luks device and all associate stuff
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
-
- # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
- with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device:
- if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition):
- list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password])
- # this way all the requesrs will be to the dm_crypt device and not to the physical partition
- partition['device_instance'] = unlocked_device
-
- if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False:
- if self._disk_encryption.hsm_device:
- Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password)
-
- btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])]
-
- for partition in btrfs_subvolumes:
- device_instance = partition['device_instance']
- mount_options = partition.get('filesystem', {}).get('mount_options', [])
- self.mount(device_instance, "/", options=','.join(mount_options))
- setup_subvolumes(installation=self, partition_dict=partition)
- device_instance.unmount()
-
- # We then handle any special cases, such as btrfs
- for partition in btrfs_subvolumes:
- subvolumes: List[Subvolume] = partition['btrfs']['subvolumes']
- for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint):
- # We cache the mount call for later
- mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume(
- installation=self,
- device=device,
- subvolume=sub_vol
- )
+ # attempt to decrypt all luks partitions
+ luks_handlers = self._prepare_luks_partitions(enc_partitions)
- # We mount ordinary partitions, and we sort them by the mountpoint
- for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']):
- mountpoint = partition['mountpoint']
- log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
+ for part_mod in sorted_part_mods:
+ if part_mod not in luks_handlers: # partition is not encrypted
+ self._mount_partition(part_mod)
+ else: # mount encrypted partition
+ self._mount_luks_partiton(part_mod, luks_handlers[part_mod])
- if partition.get('filesystem',{}).get('mount_options',[]):
- mount_options = ','.join(partition['filesystem']['mount_options'])
- mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}", options=mount_options: instance.mount(target, options=options)
- else:
- mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target)
+ def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[disk.PartitionModification, Luks2]:
+ luks_handlers = {}
- log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.DEBUG, fg="white")
+ for part_mod in partitions:
+ luks_handler = disk.device_handler.unlock_luks2_dev(
+ part_mod.dev_path,
+ part_mod.mapper_name,
+ self._disk_encryption.encryption_password
+ )
+ luks_handlers[part_mod] = luks_handler
+
+ return luks_handlers
+
+ def _mount_partition(self, part_mod: disk.PartitionModification):
+ # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
+ if part_mod.mountpoint is not None:
+ target = self.target / part_mod.relative_mountpoint
+ disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options)
+
+ if part_mod.fs_type == disk.FilesystemType.Btrfs:
+ self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols)
+
+ def _mount_luks_partiton(self, part_mod: disk.PartitionModification, luks_handler: Luks2):
+ # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
+ if part_mod.mountpoint is not None:
+ target = self.target / part_mod.relative_mountpoint
+ disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
+
+ if part_mod.fs_type == disk.FilesystemType.Btrfs:
+ self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols)
+
+ def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]):
+ for subvol in subvolumes:
+ mountpoint = self.target / subvol.relative_mountpoint
+ mount_options = subvol.mount_options + [f'subvol={subvol.name}']
+ disk.device_handler.mount(dev_path, mountpoint, options=mount_options)
+
+ def generate_key_files(self):
+ for part_mod in self._disk_encryption.partitions:
+ gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod)
+
+ luks_handler = Luks2(
+ part_mod.dev_path,
+ mapper_name=part_mod.mapper_name,
+ password=self._disk_encryption.encryption_password
+ )
- # We mount everything by sorting on the mountpoint itself.
- for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]):
- frozen_func()
+ if gen_enc_file and not part_mod.is_root():
+ log(f'Creating key-file: {part_mod.dev_path}', level=logging.INFO)
+ luks_handler.create_keyfile(self.target)
+ if part_mod.is_root() and not gen_enc_file:
+ if self._disk_encryption.hsm_device:
+ disk.Fido2.fido2_enroll(
+ self._disk_encryption.hsm_device,
+ part_mod,
+ self._disk_encryption.encryption_password
+ )
+
+ def activate_ntp(self):
+ """
+ If NTP is activated, confirm activiation in the ISO and at least one time-sync finishes
+ """
+ SysCommand('timedatectl set-ntp true')
+
+ logged = False
+ while service_state('dbus-org.freedesktop.timesync1.service') not in ['running']:
+ if not logged:
+ log(f"Waiting for dbus-org.freedesktop.timesync1.service to enter running state", level=logging.INFO)
+ logged = True
time.sleep(1)
- try:
- findmnt(pathlib.Path(f"{self.target}{mountpoint}"), traverse=False)
- except DiskError:
- raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
+ logged = False
+ while 'Server: n/a' in SysCommand('timedatectl timesync-status --no-pager --property=Server --value'):
+ if not logged:
+ log(f"Waiting for timedatectl timesync-status to report a timesync against a server", level=logging.INFO)
+ logged = True
+ time.sleep(1)
- # once everything is mounted, we generate the key files in the correct place
- for handle in list_luks_handles:
- ppath = handle[1]['device_instance'].path
- log(f"creating key-file for {ppath}",level=logging.INFO)
- self._create_keyfile(handle[0],handle[1],handle[2])
+ def sync_log_to_install_medium(self) -> bool:
+ # Copy over the install log (if there is one) to the install medium if
+ # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
+ if self.helper_flags.get('base-strapped', False) is True:
+ if filename := storage.get('LOG_FILE', None):
+ absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
+
+ if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
+ os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
- def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True, options='') -> None:
- if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
- os.makedirs(f'{self.target}{mountpoint}')
+ shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
- partition.mount(f'{self.target}{mountpoint}', options=options)
+ return True
def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'):
if file[:1] != '/':
@@ -394,7 +411,7 @@ class Installer:
else:
pacman_conf.write(line)
- def pacstrap(self, *packages :str, **kwargs :str) -> bool:
+ def pacstrap(self, *packages: Union[str, List[str]], **kwargs :str) -> bool:
if type(packages[0]) in (list, tuple):
packages = packages[0]
@@ -437,7 +454,7 @@ class Installer:
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
- def genfstab(self, flags :str = '-pU') -> bool:
+ def genfstab(self, flags :str = '-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
try:
@@ -460,7 +477,37 @@ class Installer:
for entry in self.FSTAB_ENTRIES:
fstab_fh.write(f'{entry}\n')
- return True
+ for mod in self._disk_config.device_modifications:
+ for part_mod in mod.partitions:
+ if part_mod.fs_type != disk.FilesystemType.Btrfs:
+ continue
+
+ fstab_file = Path(f'{self.target}/etc/fstab')
+
+ with fstab_file.open('r') as fp:
+ fstab = fp.readlines()
+
+ # Replace the {installation}/etc/fstab with entries
+ # using the compress=zstd where the mountpoint has compression set.
+ for index, line in enumerate(fstab):
+ # So first we grab the mount options by using subvol=.*? as a locator.
+ # And we also grab the mountpoint for the entry, for instance /var/log
+ subvoldef = re.findall(',.*?subvol=.*?[\t ]', line)
+ mountpoint = re.findall('[\t ]/.*?[\t ]', line)
+
+ if not subvoldef or not mountpoint:
+ continue
+
+ for sub_vol in part_mod.btrfs_subvols:
+ # We then locate the correct subvolume and check if it's compressed,
+ # and skip entries where compression is already defined
+ # We then sneak in the compress=zstd option if it doesn't already exist:
+ if sub_vol.compress and str(sub_vol.mountpoint) == Path(mountpoint[0].strip()) and ',compress=zstd,' not in line:
+ fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}')
+ break
+
+ with fstab_file.open('w') as fp:
+ fp.writelines(fstab)
def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
with open(f'{self.target}/etc/hostname', 'w') as fh:
@@ -509,8 +556,8 @@ class Installer:
if result := plugin.on_timezone(zone):
zone = result
- if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists():
- (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
+ if (Path("/usr") / "share" / "zoneinfo" / zone).exists():
+ (Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
@@ -523,10 +570,6 @@ class Installer:
return False
- def activate_ntp(self) -> None:
- log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO)
- self.activate_time_syncronization()
-
def activate_time_syncronization(self) -> None:
self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO)
self.enable_service('systemd-timesyncd')
@@ -540,7 +583,10 @@ class Installer:
# fstrim is owned by util-linux, a dependency of both base and systemd.
self.enable_service("fstrim.timer")
- def enable_service(self, *services :str) -> None:
+ def enable_service(self, *services: Union[str, List[str]]) -> None:
+ if type(services[0]) in (list, tuple):
+ services = services[0]
+
for service in services:
self.log(f'Enabling service {service}', level=logging.INFO)
try:
@@ -552,10 +598,10 @@ class Installer:
if hasattr(plugin, 'on_service'):
plugin.on_service(service)
- def run_command(self, cmd :str, *args :str, **kwargs :str) -> None:
+ def run_command(self, cmd :str, *args :str, **kwargs :str) -> SysCommand:
return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
- def arch_chroot(self, cmd :str, run_as :Optional[str] = None):
+ def arch_chroot(self, cmd :str, run_as :Optional[str] = None) -> SysCommand:
if run_as:
cmd = f"su - {run_as} -c {shlex.quote(cmd)}"
@@ -645,21 +691,6 @@ class Installer:
return True
- def detect_encryption(self, partition :Partition) -> bool:
- from .disk.mapperdev import MapperDev
- from .disk.dmcryptdev import DMCryptDev
- from .disk.helpers import get_filesystem_type
-
- if type(partition) is MapperDev:
- # Returns MapperDev.partition
- return partition.partition
- elif type(partition) is DMCryptDev:
- return partition.MapperDev.partition
- elif get_filesystem_type(partition.path) == 'crypto_LUKS':
- return partition
-
- return False
-
def mkinitcpio(self, *flags :str) -> bool:
for plugin in plugins.values():
if hasattr(plugin, 'on_mkinitcpio'):
@@ -668,7 +699,7 @@ class Installer:
return True
# mkinitcpio will error out if there's no vconsole.
- if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False:
+ if (vconsole := Path(f"{self.target}/etc/vconsole.conf")).exists() is False:
with vconsole.open('w') as fh:
fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n")
@@ -677,7 +708,7 @@ class Installer:
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
- if self._disk_encryption and not self._disk_encryption.hsm_device:
+ if not self._disk_encryption.hsm_device:
# For now, if we don't use HSM we revert to the old
# way of setting up encryption hooks for mkinitcpio.
# This is purely for stability reasons, we're going away from this.
@@ -694,46 +725,36 @@ class Installer:
return False
def minimal_installation(
- self, testing: bool = False, multilib: bool = False,
- hostname: str = 'archinstall', locales: List[str] = ['en_US.UTF-8 UTF-8']) -> bool:
- # Add necessary packages if encrypting the drive
- # (encrypted partitions default to btrfs for now, so we need btrfs-progs)
- # TODO: Perhaps this should be living in the function which dictates
- # the partitioning. Leaving here for now.
-
- for partition in self.partitions:
- if partition.filesystem == 'btrfs':
- # if partition.encrypted:
- if 'btrfs-progs' not in self.base_packages:
- self.base_packages.append('btrfs-progs')
- if partition.filesystem == 'xfs':
- if 'xfs' not in self.base_packages:
- self.base_packages.append('xfsprogs')
- if partition.filesystem == 'f2fs':
- if 'f2fs' not in self.base_packages:
- self.base_packages.append('f2fs-tools')
-
- # Configure mkinitcpio to handle some specific use cases.
- if partition.filesystem == 'btrfs':
- if 'btrfs' not in self.MODULES:
- self.MODULES.append('btrfs')
- if '/usr/bin/btrfs' not in self.BINARIES:
- self.BINARIES.append('/usr/bin/btrfs')
- # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
- if partition.filesystem == 'ntfs3' and partition.mountpoint == self.target:
- if 'fsck' in self.HOOKS:
- self.HOOKS.remove('fsck')
-
- if self.detect_encryption(partition):
- if self._disk_encryption and self._disk_encryption.hsm_device:
- # Required bby mkinitcpio to add support for fido2-device options
- self.pacstrap('libfido2')
-
- if 'sd-encrypt' not in self.HOOKS:
- self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt')
- else:
- if 'encrypt' not in self.HOOKS:
- self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
+ self,
+ testing: bool = False,
+ multilib: bool = False,
+ hostname: str = 'archinstall',
+ locales: List[str] = ['en_US.UTF-8 UTF-8']
+ ):
+ for mod in self._disk_config.device_modifications:
+ for part in mod.partitions:
+ if (pkg := part.fs_type.installation_pkg) is not None:
+ self.base_packages.append(pkg)
+ if (module := part.fs_type.installation_module) is not None:
+ self.MODULES.append(module)
+ if (binary := part.fs_type.installation_binary) is not None:
+ self.BINARIES.append(binary)
+
+ # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
+ if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
+ if 'fsck' in self.HOOKS:
+ self.HOOKS.remove('fsck')
+
+ if part in self._disk_encryption.partitions:
+ if self._disk_encryption.hsm_device:
+ # Required bby mkinitcpio to add support for fido2-device options
+ self.pacstrap('libfido2')
+
+ if 'sd-encrypt' not in self.HOOKS:
+ self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt')
+ else:
+ if 'encrypt' not in self.HOOKS:
+ self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
if not has_uefi():
self.base_packages.append('grub')
@@ -742,11 +763,11 @@ class Installer:
vendor = cpu_vendor()
if vendor == "AuthenticAMD":
self.base_packages.append("amd-ucode")
- if (ucode := pathlib.Path(f"{self.target}/boot/amd-ucode.img")).exists():
+ if (ucode := Path(f"{self.target}/boot/amd-ucode.img")).exists():
ucode.unlink()
elif vendor == "GenuineIntel":
self.base_packages.append("intel-ucode")
- if (ucode := pathlib.Path(f"{self.target}/boot/intel-ucode.img")).exists():
+ if (ucode := Path(f"{self.target}/boot/intel-ucode.img")).exists():
ucode.unlink()
else:
self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode.", level=logging.DEBUG)
@@ -802,9 +823,7 @@ class Installer:
if hasattr(plugin, 'on_install'):
plugin.on_install(self)
- return True
-
- def setup_swap(self, kind :str = 'zram') -> bool:
+ def setup_swap(self, kind :str = 'zram'):
if kind == 'zram':
self.log(f"Setting up swap on zram")
self.pacstrap('zram-generator')
@@ -818,16 +837,27 @@ class Installer:
self.enable_service('systemd-zram-setup@zram0.service')
self._zram_enabled = True
-
- return True
else:
raise ValueError(f"Archinstall currently only supports setting up swap on zram")
- def add_systemd_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
+ def _get_boot_partition(self) -> Optional[disk.PartitionModification]:
+ for layout in self._disk_config.device_modifications:
+ if boot := layout.get_boot_partition():
+ return boot
+ return None
+
+ def _get_root_partition(self) -> Optional[disk.PartitionModification]:
+ for mod in self._disk_config.device_modifications:
+ if root := mod.get_root_partition(self._disk_config.relative_mountpoint):
+ return root
+ return None
+
+ def _add_systemd_bootloader(self, root_partition: disk.PartitionModification):
self.pacstrap('efibootmgr')
if not has_uefi():
raise HardwareIncompatibilityError
+
# TODO: Ideally we would want to check if another config
# points towards the same disk and/or partition.
# And in which case we should do some clean up.
@@ -882,74 +912,73 @@ class Installer:
elif vendor == "GenuineIntel":
entry.write("initrd /intel-ucode.img\n")
else:
- self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG)
+ self.log(
+ f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.",
+ level=logging.DEBUG)
entry.write(f"initrd /initramfs-{kernel}{variant}.img\n")
# blkid doesn't trigger on loopback devices really well,
# so we'll use the old manual method until we get that sorted out.
- root_fs_type = get_mount_fs_type(root_partition.filesystem)
- if root_fs_type is not None:
- options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n'
- else:
- options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n'
+ options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self.KERNEL_PARAMS)}\n'
- for subvolume in root_partition.subvolumes:
- if subvolume.root is True and subvolume.name != '<FS_TREE>':
- options_entry = f"rootflags=subvol={subvolume.name} " + options_entry
+ for sub_vol in root_partition.btrfs_subvols:
+ if sub_vol.is_root():
+ options_entry = f"rootflags=subvol={sub_vol.name} " + options_entry
# Zswap should be disabled when using zram.
- #
# https://github.com/archlinux/archinstall/issues/881
if self._zram_enabled:
options_entry = "zswap.enabled=0 " + options_entry
- if real_device := self.detect_encryption(root_partition):
+ if root_partition.fs_type.is_crypto():
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG)
+ log('Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
kernel_options = f"options"
if self._disk_encryption and self._disk_encryption.hsm_device:
# Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work
- kernel_options += f" rd.luks.name={real_device.uuid}=luksdev"
+ kernel_options += f' rd.luks.name={root_partition.uuid}=luksdev'
# Note: tpm2-device and fido2-device don't play along very well:
# https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645
- kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no"
+ kernel_options += f' rd.luks.options=fido2-device=auto,password-echo=no'
else:
- kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev"
+ kernel_options += f' cryptdevice=PARTUUID={root_partition.partuuid}:luksdev'
entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}')
else:
- log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
- entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}')
+ log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
+ entry.write(f'options root=PARTUUID={root_partition.partuuid} {options_entry}')
- self.helper_flags['bootloader'] = "systemd"
-
- return True
+ self.helper_flags['bootloader'] = 'systemd'
- def add_grub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
+ def _add_grub_bootloader(
+ self,
+ boot_partition: disk.PartitionModification,
+ root_partition: disk.PartitionModification
+ ):
self.pacstrap('grub') # no need?
- root_fs_type = get_mount_fs_type(root_partition.filesystem)
+ _file = "/etc/default/grub"
- if real_device := self.detect_encryption(root_partition):
- root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip()
- _file = "/etc/default/grub"
- add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'"
- enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'"
+ if root_partition.fs_type.is_crypto():
+ log(f"Using UUID {root_partition.uuid} as encrypted root identifier", level=logging.DEBUG)
- log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO)
- SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
- SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}")
+ cmd_line_linux = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_partition.uuid}:cryptlvm rootfstype={root_partition.fs_type.value}\"/'"
+ enable_cryptdisk = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'"
+
+ SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_cryptdisk} {_file}")
else:
- _file = "/etc/default/grub"
- add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'"
- SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
+ cmd_line_linux = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_partition.fs_type.value}\"/'"
+
+ SysCommand(f"/usr/bin/arch-chroot {self.target} {cmd_line_linux} {_file}")
+
+ log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO)
- log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO)
if has_uefi():
self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
+
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
except SysCallError:
@@ -961,7 +990,7 @@ class Installer:
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True)
except SysCallError as error:
- raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}")
+ raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}")
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg')
@@ -970,22 +999,22 @@ class Installer:
self.helper_flags['bootloader'] = "grub"
- return True
-
- def add_efistub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
+ def _add_efistub_bootloader(
+ self,
+ boot_partition: disk.PartitionModification,
+ root_partition: disk.PartitionModification
+ ):
self.pacstrap('efibootmgr')
if not has_uefi():
raise HardwareIncompatibilityError
+
# TODO: Ideally we would want to check if another config
# points towards the same disk and/or partition.
# And in which case we should do some clean up.
- root_fs_type = get_mount_fs_type(root_partition.filesystem)
-
for kernel in self.kernels:
# Setup the firmware entry
-
label = f'Arch Linux ({kernel})'
loader = f"/vmlinuz-{kernel}"
@@ -1004,22 +1033,22 @@ class Installer:
# blkid doesn't trigger on loopback devices really well,
# so we'll use the old manual method until we get that sorted out.
- if real_device := self.detect_encryption(root_partition):
+
+ if root_partition.fs_type.is_crypto():
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.part_uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.part_uuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
+ log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
+ kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}')
else:
- log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'root=PARTUUID={root_partition.part_uuid} rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
+ log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
+ kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}')
- SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
+ device = disk.device_handler.get_device_by_partition_path(boot_partition.dev_path)
+ SysCommand(f'efibootmgr --disk {device.path} --part {device.path} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
self.helper_flags['bootloader'] = "efistub"
- return True
-
- def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool:
+ def add_bootloader(self, bootloader: Bootloader) -> bool:
"""
Adds a bootloader to the installation instance.
Archinstall supports one of three types:
@@ -1039,52 +1068,33 @@ class Installer:
return True
if type(self.target) == str:
- self.target = pathlib.Path(self.target)
-
- boot_partition = None
- root_partition = None
- for partition in self.partitions:
- if self.target / 'boot' in partition.mountpoints:
- boot_partition = partition
- elif self.target in partition.mountpoints:
- root_partition = partition
-
- if boot_partition is None or root_partition is None:
- raise ValueError(f"Could not detect root ({root_partition}) or boot ({boot_partition}) in {self.target} based on: {self.partitions}")
-
- self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO)
-
- if bootloader == 'systemd-bootctl':
- self.add_systemd_bootloader(boot_partition, root_partition)
- elif bootloader == "grub-install":
- self.add_grub_bootloader(boot_partition, root_partition)
- elif bootloader == 'efistub':
- self.add_efistub_bootloader(boot_partition, root_partition)
- else:
- raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}")
+ self.target = Path(self.target)
- return True
+ boot_partition = self._get_boot_partition()
+ root_partition = self._get_root_partition()
- def add_additional_packages(self, *packages :str) -> bool:
- return self.pacstrap(*packages)
+ if boot_partition is None:
+ raise ValueError(f'Could not detect boot at mountpoint {self.target}')
- def install_profile(self, profile :str) -> ModuleType:
- """
- Installs a archinstall profile script (.py file).
- This profile can be either local, remote or part of the library.
+ if root_partition is None:
+ raise ValueError(f'Could not detect root at mountpoint {self.target}')
- :param profile: Can be a local path or a remote path (URL)
- :return: Returns the imported script as a module, this way
- you can access any remaining functions exposed by the profile.
- :rtype: module
- """
- storage['installation_session'] = self
+ self.log(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}', level=logging.INFO)
+
+ match bootloader:
+ case Bootloader.Systemd:
+ self._add_systemd_bootloader(root_partition)
+ case Bootloader.Grub:
+ self._add_grub_bootloader(boot_partition, root_partition)
+ case Bootloader.Efistub:
+ self._add_efistub_bootloader(boot_partition, root_partition)
- if type(profile) == str:
- profile = Profile(self, profile)
+ def add_additional_packages(self, *packages: Union[str, List[str]]) -> bool:
+ return self.pacstrap(*packages)
- self.log(f'Installing archinstall profile {profile}', level=logging.INFO)
- return profile.install()
+ def _enable_users(self, service: str, users: List[User]):
+ for user in users:
+ self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username)
def enable_sudo(self, entity: str, group :bool = False):
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
@@ -1092,7 +1102,7 @@ class Installer:
sudoers_dir = f"{self.target}/etc/sudoers.d"
# Creates directory if not exists
- if not (sudoers_path := pathlib.Path(sudoers_dir)).exists():
+ if not (sudoers_path := Path(sudoers_dir)).exists():
sudoers_path.mkdir(parents=True)
# Guarantees sudoer confs directory recommended perms
os.chmod(sudoers_dir, 0o440)
@@ -1114,7 +1124,7 @@ class Installer:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
# Guarantees sudoer conf file recommended perms
- os.chmod(pathlib.Path(rule_file_name), 0o440)
+ os.chmod(Path(rule_file_name), 0o440)
def create_users(self, users: Union[User, List[User]]):
if not isinstance(users, list):