Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/luks.py
diff options
context:
space:
mode:
authorDaniel Girtler <blackrabbit256@gmail.com>2023-04-19 20:55:42 +1000
committerGitHub <noreply@github.com>2023-04-19 12:55:42 +0200
commit00b0ae7ba439a5a420095175b3bedd52c569db51 (patch)
treef02d081e361d5e65603f74dea3873dcc6606cf7c /archinstall/lib/luks.py
parent5253e57e9f26cf3e59cb2460544af13f56e485bb (diff)
PyParted and a large rewrite of the underlying partitioning (#1604)
* Invert mypy files * Add optional pre-commit hooks * New profile structure * Serialize profiles * Use profile instead of classmethod * Custom profile setup * Separator between back * Support profile import via url * Move profiles module * Refactor files * Remove symlink * Add user to docker group * Update schema description * Handle list services * mypy fixes * mypy fixes * Rename profilesv2 to profiles * flake8 * mypy again * Support selecting DM * Fix mypy * Cleanup * Update greeter setting * Update schema * Revert toml changes * Poc external dependencies * Dependency support * New encryption menu * flake8 * Mypy and flake8 * Unify lsblk command * Update bootloader configuration * Git hooks * Fix import * Pyparted * Remove custom font setting * flake8 * Remove default preview * Manual partitioning menu * Update structure * Disk configuration * Update filesystem * luks2 encryption * Everything works until installation * Btrfsutil * Btrfs handling * Update btrfs * Save encryption config * Fix pipewire issue * Update mypy version * Update all pre-commit * Update package versions * Revert audio/pipewire * Merge master PRs * Add master changes * Merge master changes * Small renaming * Pull master changes * Reset disk enc after disk config change * Generate locals * Update naming * Fix imports * Fix broken sync * Fix pre selection on table menu * Profile menu * Update profile * Fix post_install * Added python-pyparted to PKGBUILD, this requires [testing] to be enabled in order to run makepkg. Package still works via python -m build etc. * Swaped around some setuptools logic in pyproject Since we define `package-data` and `packages` there should be no need for: ``` [tool.setuptools.packages.find] where = ["archinstall", "archinstall.*"] ``` * Removed pyproject collisions. Duplicate definitions. * Made sure pyproject.toml includes languages * Add example and update README * Fix pyproject issues * Generate locale * Refactor imports * Simplify imports * Add profile description and package examples * Align code * Fix mypy * Simplify imports * Fix saving config * Fix wrong luks merge * Refactor installation * Fix cdrom device loading * Fix wrongly merged code * Fix imports and greeter * Don't terminate on partprobe error * Use specific path on partprobe from luks * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update github workflow to test archinstall installation * Update sway merge * Generate locales * Update workflow --------- Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com> Co-authored-by: Anton Hvornum <anton@hvornum.se> Co-authored-by: Anton Hvornum <anton.feeds+github@gmail.com> Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com>
Diffstat (limited to 'archinstall/lib/luks.py')
-rw-r--r--archinstall/lib/luks.py331
1 files changed, 180 insertions, 151 deletions
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index ad6bf093..fc531a06 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,92 +1,78 @@
from __future__ import annotations
-import json
+
import logging
-import os
-import pathlib
import shlex
import time
-from typing import Optional, List,TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-if TYPE_CHECKING:
- from .installer import Installer
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Optional, List
-from .disk import Partition, convert_device_to_uuid
-from .general import SysCommand, SysCommandWorker
+from . import disk
+from .general import SysCommand, generate_password, SysCommandWorker
from .output import log
from .exceptions import SysCallError, DiskError
from .storage import storage
-from .disk.helpers import get_filesystem_type
-from .disk.mapperdev import MapperDev
-from .disk.btrfs import BTRFSPartition
-
-
-class luks2:
- def __init__(self,
- partition: Partition,
- mountpoint: Optional[str],
- password: Optional[str],
- key_file :Optional[str] = None,
- auto_unmount :bool = False,
- *args :str,
- **kwargs :str):
-
- self.password = password
- self.partition = partition
- self.mountpoint = mountpoint
- self.args = args
- self.kwargs = kwargs
- self.key_file = key_file
- self.auto_unmount = auto_unmount
- self.filesystem = 'crypto_LUKS'
- self.mapdev = None
-
- def __enter__(self) -> Partition:
- if not self.key_file:
- self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
-
- if type(self.password) != bytes:
- self.password = bytes(self.password, 'UTF-8')
-
- with open(self.key_file, 'wb') as fh:
- fh.write(self.password)
-
- return self.unlock(self.partition, self.mountpoint, self.key_file)
-
- def __exit__(self, *args :str, **kwargs :str) -> bool:
- # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
+
+
+@dataclass
+class Luks2:
+ luks_dev_path: Path
+ mapper_name: Optional[str] = None
+ password: Optional[str] = None
+ key_file: Optional[Path] = None
+ auto_unmount: bool = False
+
+ # will be set internally after unlocking the device
+ _mapper_dev: Optional[Path] = None
+
+ @property
+ def mapper_dev(self) -> Optional[Path]:
+ if self.mapper_name:
+ return Path(f'/dev/mapper/{self.mapper_name}')
+ return None
+
+ def __post_init__(self):
+ if self.luks_dev_path is None:
+ raise ValueError('Partition must have a path set')
+
+ def __enter__(self):
+ self.unlock(self.key_file)
+
+ def __exit__(self, *args: str, **kwargs: str):
if self.auto_unmount:
- self.close()
+ self.lock()
+
+ def _default_key_file(self) -> Path:
+ return Path(f'/tmp/{self.luks_dev_path.name}.disk_pw')
- if len(args) >= 2 and args[1]:
- raise args[1]
+ def _password_bytes(self) -> bytes:
+ if not self.password:
+ raise ValueError('Password for luks2 device was not specified')
- return True
+ if isinstance(self.password, bytes):
+ return self.password
+ else:
+ return bytes(self.password, 'UTF-8')
- def encrypt(self, partition :Partition,
- password :Optional[str] = None,
- key_size :int = 512,
- hash_type :str = 'sha512',
- iter_time :int = 10000,
- key_file :Optional[str] = None) -> str:
+ def encrypt(
+ self,
+ key_size: int = 512,
+ hash_type: str = 'sha512',
+ iter_time: int = 10000,
+ key_file: Optional[Path] = None
+ ) -> Path:
+ log(f'Luks2 encrypting: {self.luks_dev_path}', level=logging.INFO)
- log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
+ byte_password = self._password_bytes()
if not key_file:
if self.key_file:
key_file = self.key_file
else:
- key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
-
- if not password:
- password = self.password
-
- if type(password) != bytes:
- password = bytes(password, 'UTF-8')
+ key_file = self._default_key_file()
- with open(key_file, 'wb') as fh:
- fh.write(password)
-
- partition.partprobe()
+ with open(key_file, 'wb') as fh:
+ fh.write(byte_password)
cryptsetup_args = shlex.join([
'/usr/bin/cryptsetup',
@@ -97,120 +83,163 @@ class luks2:
'--hash', hash_type,
'--key-size', str(key_size),
'--iter-time', str(iter_time),
- '--key-file', os.path.abspath(key_file),
+ '--key-file', str(key_file),
'--use-urandom',
- 'luksFormat', partition.path,
+ 'luksFormat', str(self.luks_dev_path),
])
try:
# Retry formatting the volume because archinstall can some times be too quick
# which generates a "Device /dev/sdX does not exist or access denied." between
# setting up partitions and us trying to encrypt it.
+ cmd_handle = None
for i in range(storage['DISK_RETRY_ATTEMPTS']):
if (cmd_handle := SysCommand(cryptsetup_args)).exit_code != 0:
time.sleep(storage['DISK_TIMEOUTS'])
else:
break
- if cmd_handle.exit_code != 0:
- raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}')
+ if cmd_handle is not None and cmd_handle.exit_code != 0:
+ output = str(b''.join(cmd_handle))
+ raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {output}')
except SysCallError as err:
if err.exit_code == 1:
- log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
- # Partition was in use, unmount it and try again
- partition.unmount()
-
- # Get crypt-information about the device by doing a reverse lookup starting with the partition path
- # For instance: /dev/sda
- SysCommand(f'bash -c "partprobe"')
- devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
-
- # For each child (sub-partition/sub-device)
- if len(children := devinfo.get('children', [])):
- for child in children:
- # Unmount the child location
- if child_mountpoint := child.get('mountpoint', None):
- log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
- SysCommand(f"umount -R {child_mountpoint}")
-
- # And close it if possible.
- log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
- SysCommand(f"cryptsetup close {child['name']}")
+ log(f'luks2 partition currently in use: {self.luks_dev_path}')
+ log('Attempting to unmount, crypt-close and trying encryption again')
+ self.lock()
# Then try again to set up the crypt-device
- cmd_handle = SysCommand(cryptsetup_args)
+ SysCommand(cryptsetup_args)
else:
raise err
return key_file
- def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition:
+ def _get_luks_uuid(self) -> str:
+ command = f'/usr/bin/cryptsetup luksUUID {self.luks_dev_path}'
+
+ try:
+ result = SysCommand(command)
+ if result.exit_code != 0:
+ raise DiskError(f'Unable to get UUID for Luks device: {result.decode()}')
+
+ return result.decode() # type: ignore
+ except SysCallError as err:
+ log(f'Unable to get UUID for Luks device: {self.luks_dev_path}', level=logging.INFO)
+ raise err
+
+ def is_unlocked(self) -> bool:
+ return self.mapper_name is not None and Path(f'/dev/mapper/{self.mapper_name}').exists()
+
+ def unlock(self, key_file: Optional[Path] = None):
"""
- Mounts a luks2 compatible partition to a certain mountpoint.
- Keyfile must be specified as there's no way to interact with the pw-prompt atm.
+ Unlocks the luks device, an optional key file location for unlocking can be specified,
+ otherwise a default location for the key file will be used.
- :param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
- :type mountpoint: str
+ :param key_file: An alternative key file
+ :type key_file: Path
"""
+ log(f'Unlocking luks2 device: {self.luks_dev_path}', level=logging.DEBUG)
+
+ if not self.mapper_name:
+ raise ValueError('mapper name missing')
+
+ byte_password = self._password_bytes()
+
+ if not key_file:
+ if self.key_file:
+ key_file = self.key_file
+ else:
+ key_file = self._default_key_file()
- if '/' in mountpoint:
- os.path.basename(mountpoint) # TODO: Raise exception instead?
+ with open(key_file, 'wb') as fh:
+ fh.write(byte_password)
wait_timer = time.time()
- while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10:
+ while Path(self.luks_dev_path).exists() is False and time.time() - wait_timer < 10:
time.sleep(0.025)
- SysCommand(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
- if os.path.islink(f'/dev/mapper/{mountpoint}'):
- self.mapdev = f'/dev/mapper/{mountpoint}'
-
- if (filesystem_type := get_filesystem_type(pathlib.Path(self.mapdev))) == 'btrfs':
- return BTRFSPartition(
- self.mapdev,
- block_device=MapperDev(mountpoint).partition.block_device,
- encrypted=True,
- filesystem=filesystem_type,
- autodetect_filesystem=False
- )
-
- return Partition(
- self.mapdev,
- block_device=MapperDev(mountpoint).partition.block_device,
- encrypted=True,
- filesystem=get_filesystem_type(self.mapdev),
- autodetect_filesystem=False
- )
-
- def close(self, mountpoint :Optional[str] = None) -> bool:
- if not mountpoint:
- mountpoint = self.mapdev
-
- SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
- return os.path.islink(self.mapdev) is False
-
- def format(self, path :str) -> None:
- if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
- raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')
-
- def add_key(self, path :pathlib.Path, password :str) -> bool:
- if not path.exists():
- raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path))
-
- log(f'Adding additional key-file {path} for {self.partition}', level=logging.INFO)
- worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}",
- environment_vars={'LC_ALL':'C'})
+ SysCommand(f'/usr/bin/cryptsetup open {self.luks_dev_path} {self.mapper_name} --key-file {key_file} --type luks2')
+
+ if not self.mapper_dev or not self.mapper_dev.is_symlink():
+ raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')
+
+ def lock(self):
+ disk.device_handler.umount(self.luks_dev_path)
+
+ # Get crypt-information about the device by doing a reverse lookup starting with the partition path
+ # For instance: /dev/sda
+ disk.device_handler.partprobe(self.luks_dev_path)
+ lsblk_info = disk.get_lsblk_info(self.luks_dev_path)
+
+ # For each child (sub-partition/sub-device)
+ for child in lsblk_info.children:
+ # Unmount the child location
+ for mountpoint in child.mountpoints:
+ log(f'Unmounting {mountpoint}', level=logging.DEBUG)
+ disk.device_handler.umount(mountpoint, recursive=True)
+
+ # And close it if possible.
+ log(f"Closing crypt device {child.name}", level=logging.DEBUG)
+ SysCommand(f"cryptsetup close {child.name}")
+
+ self._mapper_dev = None
+
+ def create_keyfile(self, target_path: Path, override: bool = False):
+ """
+ Routine to create keyfiles, so it can be moved elsewhere
+ """
+ if self.mapper_name is None:
+ raise ValueError('Mapper name must be provided')
+
+ # Once we store the key as ../xyzloop.key systemd-cryptsetup can
+ # automatically load this key if we name the device to "xyzloop"
+ key_file_path = target_path / 'etc/cryptsetup-keys.d/' / self.mapper_name
+ key_file = key_file_path / '.key'
+ crypttab_path = target_path / 'etc/crypttab'
+
+ if key_file.exists():
+ if not override:
+ log(f'Key file {key_file} already exists, keeping existing')
+ return
+ else:
+ log(f'Key file {key_file} already exists, overriding')
+
+ key_file_path.mkdir(parents=True, exist_ok=True)
+
+ with open(key_file, "w") as keyfile:
+ keyfile.write(generate_password(length=512))
+
+ key_file_path.chmod(0o400)
+
+ self._add_key(key_file)
+ self._crypttab(crypttab_path, key_file, options=["luks", "key-slot=1"])
+
+ def _add_key(self, key_file: Path):
+ log(f'Adding additional key-file {key_file}', level=logging.INFO)
+
+ command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}'
+ worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'})
pw_injected = False
+
while worker.is_alive():
if b'Enter any existing passphrase' in worker and pw_injected is False:
- worker.write(bytes(password, 'UTF-8'))
+ worker.write(self._password_bytes())
pw_injected = True
if worker.exit_code != 0:
- raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}')
-
- return True
-
- def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None:
- log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO)
- with open(f"{installation.target}/etc/crypttab", "a") as crypttab:
- crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n")
+ raise DiskError(f'Could not add encryption key {key_file} to {self.luks_dev_path}: {worker.decode()}')
+
+ def _crypttab(
+ self,
+ crypttab_path: Path,
+ key_file: Path,
+ options: List[str]
+ ) -> None:
+ log(f'Adding crypttab entry for key {key_file}', level=logging.INFO)
+
+ with open(crypttab_path, 'a') as crypttab:
+ opt = ','.join(options)
+ uuid = self._get_luks_uuid()
+ row = f"{self.mapper_name} UUID={uuid} {key_file} {opt}\n"
+ crypttab.write(row)