From 00b0ae7ba439a5a420095175b3bedd52c569db51 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Wed, 19 Apr 2023 20:55:42 +1000 Subject: PyParted and a large rewrite of the underlying partitioning (#1604) * Invert mypy files * Add optional pre-commit hooks * New profile structure * Serialize profiles * Use profile instead of classmethod * Custom profile setup * Separator between back * Support profile import via url * Move profiles module * Refactor files * Remove symlink * Add user to docker group * Update schema description * Handle list services * mypy fixes * mypy fixes * Rename profilesv2 to profiles * flake8 * mypy again * Support selecting DM * Fix mypy * Cleanup * Update greeter setting * Update schema * Revert toml changes * Poc external dependencies * Dependency support * New encryption menu * flake8 * Mypy and flake8 * Unify lsblk command * Update bootloader configuration * Git hooks * Fix import * Pyparted * Remove custom font setting * flake8 * Remove default preview * Manual partitioning menu * Update structure * Disk configuration * Update filesystem * luks2 encryption * Everything works until installation * Btrfsutil * Btrfs handling * Update btrfs * Save encryption config * Fix pipewire issue * Update mypy version * Update all pre-commit * Update package versions * Revert audio/pipewire * Merge master PRs * Add master changes * Merge master changes * Small renaming * Pull master changes * Reset disk enc after disk config change * Generate locals * Update naming * Fix imports * Fix broken sync * Fix pre selection on table menu * Profile menu * Update profile * Fix post_install * Added python-pyparted to PKGBUILD, this requires [testing] to be enabled in order to run makepkg. Package still works via python -m build etc. * Swaped around some setuptools logic in pyproject Since we define `package-data` and `packages` there should be no need for: ``` [tool.setuptools.packages.find] where = ["archinstall", "archinstall.*"] ``` * Removed pyproject collisions. Duplicate definitions. * Made sure pyproject.toml includes languages * Add example and update README * Fix pyproject issues * Generate locale * Refactor imports * Simplify imports * Add profile description and package examples * Align code * Fix mypy * Simplify imports * Fix saving config * Fix wrong luks merge * Refactor installation * Fix cdrom device loading * Fix wrongly merged code * Fix imports and greeter * Don't terminate on partprobe error * Use specific path on partprobe from luks * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update github workflow to test archinstall installation * Update sway merge * Generate locales * Update workflow --------- Co-authored-by: Daniel Girtler Co-authored-by: Anton Hvornum Co-authored-by: Anton Hvornum Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> --- archinstall/lib/luks.py | 331 ++++++++++++++++++++++++++---------------------- 1 file changed, 180 insertions(+), 151 deletions(-) (limited to 'archinstall/lib/luks.py') diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index ad6bf093..fc531a06 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -1,92 +1,78 @@ from __future__ import annotations -import json + import logging -import os -import pathlib import shlex import time -from typing import Optional, List,TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from .installer import Installer +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, List -from .disk import Partition, convert_device_to_uuid -from .general import SysCommand, SysCommandWorker +from . import disk +from .general import SysCommand, generate_password, SysCommandWorker from .output import log from .exceptions import SysCallError, DiskError from .storage import storage -from .disk.helpers import get_filesystem_type -from .disk.mapperdev import MapperDev -from .disk.btrfs import BTRFSPartition - - -class luks2: - def __init__(self, - partition: Partition, - mountpoint: Optional[str], - password: Optional[str], - key_file :Optional[str] = None, - auto_unmount :bool = False, - *args :str, - **kwargs :str): - - self.password = password - self.partition = partition - self.mountpoint = mountpoint - self.args = args - self.kwargs = kwargs - self.key_file = key_file - self.auto_unmount = auto_unmount - self.filesystem = 'crypto_LUKS' - self.mapdev = None - - def __enter__(self) -> Partition: - if not self.key_file: - self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? - - if type(self.password) != bytes: - self.password = bytes(self.password, 'UTF-8') - - with open(self.key_file, 'wb') as fh: - fh.write(self.password) - - return self.unlock(self.partition, self.mountpoint, self.key_file) - - def __exit__(self, *args :str, **kwargs :str) -> bool: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager + + +@dataclass +class Luks2: + luks_dev_path: Path + mapper_name: Optional[str] = None + password: Optional[str] = None + key_file: Optional[Path] = None + auto_unmount: bool = False + + # will be set internally after unlocking the device + _mapper_dev: Optional[Path] = None + + @property + def mapper_dev(self) -> Optional[Path]: + if self.mapper_name: + return Path(f'/dev/mapper/{self.mapper_name}') + return None + + def __post_init__(self): + if self.luks_dev_path is None: + raise ValueError('Partition must have a path set') + + def __enter__(self): + self.unlock(self.key_file) + + def __exit__(self, *args: str, **kwargs: str): if self.auto_unmount: - self.close() + self.lock() + + def _default_key_file(self) -> Path: + return Path(f'/tmp/{self.luks_dev_path.name}.disk_pw') - if len(args) >= 2 and args[1]: - raise args[1] + def _password_bytes(self) -> bytes: + if not self.password: + raise ValueError('Password for luks2 device was not specified') - return True + if isinstance(self.password, bytes): + return self.password + else: + return bytes(self.password, 'UTF-8') - def encrypt(self, partition :Partition, - password :Optional[str] = None, - key_size :int = 512, - hash_type :str = 'sha512', - iter_time :int = 10000, - key_file :Optional[str] = None) -> str: + def encrypt( + self, + key_size: int = 512, + hash_type: str = 'sha512', + iter_time: int = 10000, + key_file: Optional[Path] = None + ) -> Path: + log(f'Luks2 encrypting: {self.luks_dev_path}', level=logging.INFO) - log(f'Encrypting {partition} (This might take a while)', level=logging.INFO) + byte_password = self._password_bytes() if not key_file: if self.key_file: key_file = self.key_file else: - key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? - - if not password: - password = self.password - - if type(password) != bytes: - password = bytes(password, 'UTF-8') + key_file = self._default_key_file() - with open(key_file, 'wb') as fh: - fh.write(password) - - partition.partprobe() + with open(key_file, 'wb') as fh: + fh.write(byte_password) cryptsetup_args = shlex.join([ '/usr/bin/cryptsetup', @@ -97,120 +83,163 @@ class luks2: '--hash', hash_type, '--key-size', str(key_size), '--iter-time', str(iter_time), - '--key-file', os.path.abspath(key_file), + '--key-file', str(key_file), '--use-urandom', - 'luksFormat', partition.path, + 'luksFormat', str(self.luks_dev_path), ]) try: # Retry formatting the volume because archinstall can some times be too quick # which generates a "Device /dev/sdX does not exist or access denied." between # setting up partitions and us trying to encrypt it. + cmd_handle = None for i in range(storage['DISK_RETRY_ATTEMPTS']): if (cmd_handle := SysCommand(cryptsetup_args)).exit_code != 0: time.sleep(storage['DISK_TIMEOUTS']) else: break - if cmd_handle.exit_code != 0: - raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}') + if cmd_handle is not None and cmd_handle.exit_code != 0: + output = str(b''.join(cmd_handle)) + raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {output}') except SysCallError as err: if err.exit_code == 1: - log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG) - # Partition was in use, unmount it and try again - partition.unmount() - - # Get crypt-information about the device by doing a reverse lookup starting with the partition path - # For instance: /dev/sda - SysCommand(f'bash -c "partprobe"') - devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0] - - # For each child (sub-partition/sub-device) - if len(children := devinfo.get('children', [])): - for child in children: - # Unmount the child location - if child_mountpoint := child.get('mountpoint', None): - log(f'Unmounting {child_mountpoint}', level=logging.DEBUG) - SysCommand(f"umount -R {child_mountpoint}") - - # And close it if possible. - log(f"Closing crypt device {child['name']}", level=logging.DEBUG) - SysCommand(f"cryptsetup close {child['name']}") + log(f'luks2 partition currently in use: {self.luks_dev_path}') + log('Attempting to unmount, crypt-close and trying encryption again') + self.lock() # Then try again to set up the crypt-device - cmd_handle = SysCommand(cryptsetup_args) + SysCommand(cryptsetup_args) else: raise err return key_file - def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition: + def _get_luks_uuid(self) -> str: + command = f'/usr/bin/cryptsetup luksUUID {self.luks_dev_path}' + + try: + result = SysCommand(command) + if result.exit_code != 0: + raise DiskError(f'Unable to get UUID for Luks device: {result.decode()}') + + return result.decode() # type: ignore + except SysCallError as err: + log(f'Unable to get UUID for Luks device: {self.luks_dev_path}', level=logging.INFO) + raise err + + def is_unlocked(self) -> bool: + return self.mapper_name is not None and Path(f'/dev/mapper/{self.mapper_name}').exists() + + def unlock(self, key_file: Optional[Path] = None): """ - Mounts a luks2 compatible partition to a certain mountpoint. - Keyfile must be specified as there's no way to interact with the pw-prompt atm. + Unlocks the luks device, an optional key file location for unlocking can be specified, + otherwise a default location for the key file will be used. - :param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev - :type mountpoint: str + :param key_file: An alternative key file + :type key_file: Path """ + log(f'Unlocking luks2 device: {self.luks_dev_path}', level=logging.DEBUG) + + if not self.mapper_name: + raise ValueError('mapper name missing') + + byte_password = self._password_bytes() + + if not key_file: + if self.key_file: + key_file = self.key_file + else: + key_file = self._default_key_file() - if '/' in mountpoint: - os.path.basename(mountpoint) # TODO: Raise exception instead? + with open(key_file, 'wb') as fh: + fh.write(byte_password) wait_timer = time.time() - while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10: + while Path(self.luks_dev_path).exists() is False and time.time() - wait_timer < 10: time.sleep(0.025) - SysCommand(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') - if os.path.islink(f'/dev/mapper/{mountpoint}'): - self.mapdev = f'/dev/mapper/{mountpoint}' - - if (filesystem_type := get_filesystem_type(pathlib.Path(self.mapdev))) == 'btrfs': - return BTRFSPartition( - self.mapdev, - block_device=MapperDev(mountpoint).partition.block_device, - encrypted=True, - filesystem=filesystem_type, - autodetect_filesystem=False - ) - - return Partition( - self.mapdev, - block_device=MapperDev(mountpoint).partition.block_device, - encrypted=True, - filesystem=get_filesystem_type(self.mapdev), - autodetect_filesystem=False - ) - - def close(self, mountpoint :Optional[str] = None) -> bool: - if not mountpoint: - mountpoint = self.mapdev - - SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}') - return os.path.islink(self.mapdev) is False - - def format(self, path :str) -> None: - if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: - raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') - - def add_key(self, path :pathlib.Path, password :str) -> bool: - if not path.exists(): - raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path)) - - log(f'Adding additional key-file {path} for {self.partition}', level=logging.INFO) - worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}", - environment_vars={'LC_ALL':'C'}) + SysCommand(f'/usr/bin/cryptsetup open {self.luks_dev_path} {self.mapper_name} --key-file {key_file} --type luks2') + + if not self.mapper_dev or not self.mapper_dev.is_symlink(): + raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}') + + def lock(self): + disk.device_handler.umount(self.luks_dev_path) + + # Get crypt-information about the device by doing a reverse lookup starting with the partition path + # For instance: /dev/sda + disk.device_handler.partprobe(self.luks_dev_path) + lsblk_info = disk.get_lsblk_info(self.luks_dev_path) + + # For each child (sub-partition/sub-device) + for child in lsblk_info.children: + # Unmount the child location + for mountpoint in child.mountpoints: + log(f'Unmounting {mountpoint}', level=logging.DEBUG) + disk.device_handler.umount(mountpoint, recursive=True) + + # And close it if possible. + log(f"Closing crypt device {child.name}", level=logging.DEBUG) + SysCommand(f"cryptsetup close {child.name}") + + self._mapper_dev = None + + def create_keyfile(self, target_path: Path, override: bool = False): + """ + Routine to create keyfiles, so it can be moved elsewhere + """ + if self.mapper_name is None: + raise ValueError('Mapper name must be provided') + + # Once we store the key as ../xyzloop.key systemd-cryptsetup can + # automatically load this key if we name the device to "xyzloop" + key_file_path = target_path / 'etc/cryptsetup-keys.d/' / self.mapper_name + key_file = key_file_path / '.key' + crypttab_path = target_path / 'etc/crypttab' + + if key_file.exists(): + if not override: + log(f'Key file {key_file} already exists, keeping existing') + return + else: + log(f'Key file {key_file} already exists, overriding') + + key_file_path.mkdir(parents=True, exist_ok=True) + + with open(key_file, "w") as keyfile: + keyfile.write(generate_password(length=512)) + + key_file_path.chmod(0o400) + + self._add_key(key_file) + self._crypttab(crypttab_path, key_file, options=["luks", "key-slot=1"]) + + def _add_key(self, key_file: Path): + log(f'Adding additional key-file {key_file}', level=logging.INFO) + + command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}' + worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'}) pw_injected = False + while worker.is_alive(): if b'Enter any existing passphrase' in worker and pw_injected is False: - worker.write(bytes(password, 'UTF-8')) + worker.write(self._password_bytes()) pw_injected = True if worker.exit_code != 0: - raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}') - - return True - - def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None: - log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO) - with open(f"{installation.target}/etc/crypttab", "a") as crypttab: - crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n") + raise DiskError(f'Could not add encryption key {key_file} to {self.luks_dev_path}: {worker.decode()}') + + def _crypttab( + self, + crypttab_path: Path, + key_file: Path, + options: List[str] + ) -> None: + log(f'Adding crypttab entry for key {key_file}', level=logging.INFO) + + with open(crypttab_path, 'a') as crypttab: + opt = ','.join(options) + uuid = self._get_luks_uuid() + row = f"{self.mapper_name} UUID={uuid} {key_file} {opt}\n" + crypttab.write(row) -- cgit v1.2.3-54-g00ecf