index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/luks.py | 361 |
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index ad6bf093..50e15cee 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -1,92 +1,77 @@ from __future__ import annotations -import json -import logging -import os -import pathlib + import shlex import time -from typing import Optional, List,TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from .installer import Installer - -from .disk import Partition, convert_device_to_uuid -from .general import SysCommand, SysCommandWorker -from .output import log +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, List + +from . import disk +from .general import SysCommand, generate_password, SysCommandWorker +from .output import info, debug from .exceptions import SysCallError, DiskError from .storage import storage -from .disk.helpers import get_filesystem_type -from .disk.mapperdev import MapperDev -from .disk.btrfs import BTRFSPartition - - -class luks2: - def __init__(self, - partition: Partition, - mountpoint: Optional[str], - password: Optional[str], - key_file :Optional[str] = None, - auto_unmount :bool = False, - *args :str, - **kwargs :str): - - self.password = password - self.partition = partition - self.mountpoint = mountpoint - self.args = args - self.kwargs = kwargs - self.key_file = key_file - self.auto_unmount = auto_unmount - self.filesystem = 'crypto_LUKS' - self.mapdev = None - def __enter__(self) -> Partition: - if not self.key_file: - self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? - if type(self.password) != bytes: - self.password = bytes(self.password, 'UTF-8') +@dataclass +class Luks2: + luks_dev_path: Path + mapper_name: Optional[str] = None + password: Optional[str] = None + key_file: Optional[Path] = None + auto_unmount: bool = False - with open(self.key_file, 'wb') as fh: - fh.write(self.password) + # will be set internally after unlocking the device + _mapper_dev: Optional[Path] = None - return self.unlock(self.partition, self.mountpoint, self.key_file) + @property + def mapper_dev(self) -> Optional[Path]: + if self.mapper_name: + return Path(f'/dev/mapper/{self.mapper_name}') + return None - def __exit__(self, *args :str, **kwargs :str) -> bool: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager + def __post_init__(self): + if self.luks_dev_path is None: + raise ValueError('Partition must have a path set') + + def __enter__(self): + self.unlock(self.key_file) + + def __exit__(self, *args: str, **kwargs: str): if self.auto_unmount: - self.close() + self.lock() + + def _default_key_file(self) -> Path: + return Path(f'/tmp/{self.luks_dev_path.name}.disk_pw') - if len(args) >= 2 and args[1]: - raise args[1] + def _password_bytes(self) -> bytes: + if not self.password: + raise ValueError('Password for luks2 device was not specified') - return True + if isinstance(self.password, bytes): + return self.password + else: + return bytes(self.password, 'UTF-8') - def encrypt(self, partition :Partition, - password :Optional[str] = None, - key_size :int = 512, - hash_type :str = 'sha512', - iter_time :int = 10000, - key_file :Optional[str] = None) -> str: + def encrypt( + self, + key_size: int = 512, + hash_type: str = 'sha512', + iter_time: int = 10000, + key_file: Optional[Path] = None + ) -> Path: + debug(f'Luks2 encrypting: {self.luks_dev_path}') - log(f'Encrypting {partition} (This might take a while)', level=logging.INFO) + byte_password = self._password_bytes() if not key_file: if self.key_file: key_file = self.key_file else: - key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? + key_file = self._default_key_file() - if not password: - password = self.password - - if type(password) != bytes: - password = bytes(password, 'UTF-8') - - with open(key_file, 'wb') as fh: - fh.write(password) - - partition.partprobe() + with open(key_file, 'wb') as fh: + fh.write(byte_password) cryptsetup_args = shlex.join([ '/usr/bin/cryptsetup', @@ -97,120 +82,170 @@ class luks2: '--hash', hash_type, '--key-size', str(key_size), '--iter-time', str(iter_time), - '--key-file', os.path.abspath(key_file), + '--key-file', str(key_file), '--use-urandom', - 'luksFormat', partition.path, + 'luksFormat', str(self.luks_dev_path), ]) - try: - # Retry formatting the volume because archinstall can some times be too quick - # which generates a "Device /dev/sdX does not exist or access denied." between - # setting up partitions and us trying to encrypt it. - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if (cmd_handle := SysCommand(cryptsetup_args)).exit_code != 0: - time.sleep(storage['DISK_TIMEOUTS']) + debug(f'cryptsetup format: {cryptsetup_args}') + + # Retry formatting the volume because archinstall can some times be too quick + # which generates a "Device /dev/sdX does not exist or access denied." between + # setting up partitions and us trying to encrypt it. + for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS'] + 1): + try: + result = SysCommand(cryptsetup_args).decode() + debug(f'cryptsetup luksFormat output: {result}') + break + except SysCallError as err: + time.sleep(storage['DISK_TIMEOUTS']) + + if retry_attempt != storage['DISK_RETRY_ATTEMPTS']: + continue + + if err.exit_code == 1: + info(f'luks2 partition currently in use: {self.luks_dev_path}') + info('Attempting to unmount, crypt-close and trying encryption again') + + self.lock() + # Then try again to set up the crypt-device + result = SysCommand(cryptsetup_args).decode() + debug(f'cryptsetup luksFormat output: {result}') else: - break + raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}') - if cmd_handle.exit_code != 0: - raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}') - except SysCallError as err: - if err.exit_code == 1: - log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG) - # Partition was in use, unmount it and try again - partition.unmount() - - # Get crypt-information about the device by doing a reverse lookup starting with the partition path - # For instance: /dev/sda - SysCommand(f'bash -c "partprobe"') - devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0] - - # For each child (sub-partition/sub-device) - if len(children := devinfo.get('children', [])): - for child in children: - # Unmount the child location - if child_mountpoint := child.get('mountpoint', None): - log(f'Unmounting {child_mountpoint}', level=logging.DEBUG) - SysCommand(f"umount -R {child_mountpoint}") - - # And close it if possible. - log(f"Closing crypt device {child['name']}", level=logging.DEBUG) - SysCommand(f"cryptsetup close {child['name']}") - - # Then try again to set up the crypt-device - cmd_handle = SysCommand(cryptsetup_args) - else: - raise err + self.key_file = key_file return key_file - def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition: + def _get_luks_uuid(self) -> str: + command = f'/usr/bin/cryptsetup luksUUID {self.luks_dev_path}' + + try: + return SysCommand(command).decode() + except SysCallError as err: + info(f'Unable to get UUID for Luks device: {self.luks_dev_path}') + raise err + + def is_unlocked(self) -> bool: + return self.mapper_name is not None and Path(f'/dev/mapper/{self.mapper_name}').exists() + + def unlock(self, key_file: Optional[Path] = None): """ - Mounts a luks2 compatible partition to a certain mountpoint. - Keyfile must be specified as there's no way to interact with the pw-prompt atm. + Unlocks the luks device, an optional key file location for unlocking can be specified, + otherwise a default location for the key file will be used. - :param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev - :type mountpoint: str + :param key_file: An alternative key file + :type key_file: Path """ + debug(f'Unlocking luks2 device: {self.luks_dev_path}') - if '/' in mountpoint: - os.path.basename(mountpoint) # TODO: Raise exception instead? + if not self.mapper_name: + raise ValueError('mapper name missing') + + byte_password = self._password_bytes() + + if not key_file: + if self.key_file: + key_file = self.key_file + else: + key_file = self._default_key_file() + + with open(key_file, 'wb') as fh: + fh.write(byte_password) wait_timer = time.time() - while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10: + while Path(self.luks_dev_path).exists() is False and time.time() - wait_timer < 10: time.sleep(0.025) - SysCommand(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') - if os.path.islink(f'/dev/mapper/{mountpoint}'): - self.mapdev = f'/dev/mapper/{mountpoint}' - - if (filesystem_type := get_filesystem_type(pathlib.Path(self.mapdev))) == 'btrfs': - return BTRFSPartition( - self.mapdev, - block_device=MapperDev(mountpoint).partition.block_device, - encrypted=True, - filesystem=filesystem_type, - autodetect_filesystem=False - ) - - return Partition( - self.mapdev, - block_device=MapperDev(mountpoint).partition.block_device, - encrypted=True, - filesystem=get_filesystem_type(self.mapdev), - autodetect_filesystem=False - ) - - def close(self, mountpoint :Optional[str] = None) -> bool: - if not mountpoint: - mountpoint = self.mapdev - - SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}') - return os.path.islink(self.mapdev) is False - - def format(self, path :str) -> None: - if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: - raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') - - def add_key(self, path :pathlib.Path, password :str) -> bool: - if not path.exists(): - raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path)) - - log(f'Adding additional key-file {path} for {self.partition}', level=logging.INFO) - worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}", - environment_vars={'LC_ALL':'C'}) + result = SysCommand( + '/usr/bin/cryptsetup open ' + f'{self.luks_dev_path} ' + f'{self.mapper_name} ' + f'--key-file {key_file} ' + f'--type luks2' + ).decode() + + debug(f'cryptsetup open output: {result}') + + if not self.mapper_dev or not self.mapper_dev.is_symlink(): + raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}') + + def lock(self): + disk.device_handler.umount(self.luks_dev_path) + + # Get crypt-information about the device by doing a reverse lookup starting with the partition path + # For instance: /dev/sda + lsblk_info = disk.get_lsblk_info(self.luks_dev_path) + + # For each child (sub-partition/sub-device) + for child in lsblk_info.children: + # Unmount the child location + for mountpoint in child.mountpoints: + debug(f'Unmounting {mountpoint}') + disk.device_handler.umount(mountpoint, recursive=True) + + # And close it if possible. + debug(f"Closing crypt device {child.name}") + SysCommand(f"cryptsetup close {child.name}") + + self._mapper_dev = None + + def create_keyfile(self, target_path: Path, override: bool = False): + """ + Routine to create keyfiles, so it can be moved elsewhere + """ + if self.mapper_name is None: + raise ValueError('Mapper name must be provided') + + # Once we store the key as ../xyzloop.key systemd-cryptsetup can + # automatically load this key if we name the device to "xyzloop" + kf_path = Path(f'/etc/cryptsetup-keys.d/{self.mapper_name}.key') + key_file = target_path / kf_path.relative_to(kf_path.root) + crypttab_path = target_path / 'etc/crypttab' + + if key_file.exists(): + if not override: + info(f'Key file {key_file} already exists, keeping existing') + return + else: + info(f'Key file {key_file} already exists, overriding') + + key_file.parent.mkdir(parents=True, exist_ok=True) + + pwd = generate_password(length=512) + key_file.write_text(pwd) + + key_file.chmod(0o400) + + self._add_key(key_file) + self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"]) + + def _add_key(self, key_file: Path): + debug(f'Adding additional key-file {key_file}') + + command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}' + worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'}) pw_injected = False + while worker.is_alive(): if b'Enter any existing passphrase' in worker and pw_injected is False: - worker.write(bytes(password, 'UTF-8')) + worker.write(self._password_bytes()) pw_injected = True if worker.exit_code != 0: - raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}') - - return True - - def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None: - log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO) - with open(f"{installation.target}/etc/crypttab", "a") as crypttab: - crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n") + raise DiskError(f'Could not add encryption key {key_file} to {self.luks_dev_path}: {worker.decode()}') + + def _crypttab( + self, + crypttab_path: Path, + key_file: Path, + options: List[str] + ) -> None: + debug(f'Adding crypttab entry for key {key_file}') + + with open(crypttab_path, 'a') as crypttab: + opt = ','.join(options) + uuid = self._get_luks_uuid() + row = f"{self.mapper_name} UUID={uuid} {key_file} {opt}\n" + crypttab.write(row) |