Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/luks.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/luks.py')
-rw-r--r--archinstall/lib/luks.py361
1 files changed, 198 insertions, 163 deletions
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index ad6bf093..50e15cee 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,92 +1,77 @@
from __future__ import annotations
-import json
-import logging
-import os
-import pathlib
+
import shlex
import time
-from typing import Optional, List,TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-if TYPE_CHECKING:
- from .installer import Installer
-
-from .disk import Partition, convert_device_to_uuid
-from .general import SysCommand, SysCommandWorker
-from .output import log
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Optional, List
+
+from . import disk
+from .general import SysCommand, generate_password, SysCommandWorker
+from .output import info, debug
from .exceptions import SysCallError, DiskError
from .storage import storage
-from .disk.helpers import get_filesystem_type
-from .disk.mapperdev import MapperDev
-from .disk.btrfs import BTRFSPartition
-
-
-class luks2:
- def __init__(self,
- partition: Partition,
- mountpoint: Optional[str],
- password: Optional[str],
- key_file :Optional[str] = None,
- auto_unmount :bool = False,
- *args :str,
- **kwargs :str):
-
- self.password = password
- self.partition = partition
- self.mountpoint = mountpoint
- self.args = args
- self.kwargs = kwargs
- self.key_file = key_file
- self.auto_unmount = auto_unmount
- self.filesystem = 'crypto_LUKS'
- self.mapdev = None
- def __enter__(self) -> Partition:
- if not self.key_file:
- self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
- if type(self.password) != bytes:
- self.password = bytes(self.password, 'UTF-8')
+@dataclass
+class Luks2:
+ luks_dev_path: Path
+ mapper_name: Optional[str] = None
+ password: Optional[str] = None
+ key_file: Optional[Path] = None
+ auto_unmount: bool = False
- with open(self.key_file, 'wb') as fh:
- fh.write(self.password)
+ # will be set internally after unlocking the device
+ _mapper_dev: Optional[Path] = None
- return self.unlock(self.partition, self.mountpoint, self.key_file)
+ @property
+ def mapper_dev(self) -> Optional[Path]:
+ if self.mapper_name:
+ return Path(f'/dev/mapper/{self.mapper_name}')
+ return None
- def __exit__(self, *args :str, **kwargs :str) -> bool:
- # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
+ def __post_init__(self):
+ if self.luks_dev_path is None:
+ raise ValueError('Partition must have a path set')
+
+ def __enter__(self):
+ self.unlock(self.key_file)
+
+ def __exit__(self, *args: str, **kwargs: str):
if self.auto_unmount:
- self.close()
+ self.lock()
+
+ def _default_key_file(self) -> Path:
+ return Path(f'/tmp/{self.luks_dev_path.name}.disk_pw')
- if len(args) >= 2 and args[1]:
- raise args[1]
+ def _password_bytes(self) -> bytes:
+ if not self.password:
+ raise ValueError('Password for luks2 device was not specified')
- return True
+ if isinstance(self.password, bytes):
+ return self.password
+ else:
+ return bytes(self.password, 'UTF-8')
- def encrypt(self, partition :Partition,
- password :Optional[str] = None,
- key_size :int = 512,
- hash_type :str = 'sha512',
- iter_time :int = 10000,
- key_file :Optional[str] = None) -> str:
+ def encrypt(
+ self,
+ key_size: int = 512,
+ hash_type: str = 'sha512',
+ iter_time: int = 10000,
+ key_file: Optional[Path] = None
+ ) -> Path:
+ debug(f'Luks2 encrypting: {self.luks_dev_path}')
- log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
+ byte_password = self._password_bytes()
if not key_file:
if self.key_file:
key_file = self.key_file
else:
- key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
+ key_file = self._default_key_file()
- if not password:
- password = self.password
-
- if type(password) != bytes:
- password = bytes(password, 'UTF-8')
-
- with open(key_file, 'wb') as fh:
- fh.write(password)
-
- partition.partprobe()
+ with open(key_file, 'wb') as fh:
+ fh.write(byte_password)
cryptsetup_args = shlex.join([
'/usr/bin/cryptsetup',
@@ -97,120 +82,170 @@ class luks2:
'--hash', hash_type,
'--key-size', str(key_size),
'--iter-time', str(iter_time),
- '--key-file', os.path.abspath(key_file),
+ '--key-file', str(key_file),
'--use-urandom',
- 'luksFormat', partition.path,
+ 'luksFormat', str(self.luks_dev_path),
])
- try:
- # Retry formatting the volume because archinstall can some times be too quick
- # which generates a "Device /dev/sdX does not exist or access denied." between
- # setting up partitions and us trying to encrypt it.
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- if (cmd_handle := SysCommand(cryptsetup_args)).exit_code != 0:
- time.sleep(storage['DISK_TIMEOUTS'])
+ debug(f'cryptsetup format: {cryptsetup_args}')
+
+ # Retry formatting the volume because archinstall can some times be too quick
+ # which generates a "Device /dev/sdX does not exist or access denied." between
+ # setting up partitions and us trying to encrypt it.
+ for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS'] + 1):
+ try:
+ result = SysCommand(cryptsetup_args).decode()
+ debug(f'cryptsetup luksFormat output: {result}')
+ break
+ except SysCallError as err:
+ time.sleep(storage['DISK_TIMEOUTS'])
+
+ if retry_attempt != storage['DISK_RETRY_ATTEMPTS']:
+ continue
+
+ if err.exit_code == 1:
+ info(f'luks2 partition currently in use: {self.luks_dev_path}')
+ info('Attempting to unmount, crypt-close and trying encryption again')
+
+ self.lock()
+ # Then try again to set up the crypt-device
+ result = SysCommand(cryptsetup_args).decode()
+ debug(f'cryptsetup luksFormat output: {result}')
else:
- break
+ raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}')
- if cmd_handle.exit_code != 0:
- raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}')
- except SysCallError as err:
- if err.exit_code == 1:
- log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
- # Partition was in use, unmount it and try again
- partition.unmount()
-
- # Get crypt-information about the device by doing a reverse lookup starting with the partition path
- # For instance: /dev/sda
- SysCommand(f'bash -c "partprobe"')
- devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
-
- # For each child (sub-partition/sub-device)
- if len(children := devinfo.get('children', [])):
- for child in children:
- # Unmount the child location
- if child_mountpoint := child.get('mountpoint', None):
- log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
- SysCommand(f"umount -R {child_mountpoint}")
-
- # And close it if possible.
- log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
- SysCommand(f"cryptsetup close {child['name']}")
-
- # Then try again to set up the crypt-device
- cmd_handle = SysCommand(cryptsetup_args)
- else:
- raise err
+ self.key_file = key_file
return key_file
- def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition:
+ def _get_luks_uuid(self) -> str:
+ command = f'/usr/bin/cryptsetup luksUUID {self.luks_dev_path}'
+
+ try:
+ return SysCommand(command).decode()
+ except SysCallError as err:
+ info(f'Unable to get UUID for Luks device: {self.luks_dev_path}')
+ raise err
+
+ def is_unlocked(self) -> bool:
+ return self.mapper_name is not None and Path(f'/dev/mapper/{self.mapper_name}').exists()
+
+ def unlock(self, key_file: Optional[Path] = None):
"""
- Mounts a luks2 compatible partition to a certain mountpoint.
- Keyfile must be specified as there's no way to interact with the pw-prompt atm.
+ Unlocks the luks device, an optional key file location for unlocking can be specified,
+ otherwise a default location for the key file will be used.
- :param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
- :type mountpoint: str
+ :param key_file: An alternative key file
+ :type key_file: Path
"""
+ debug(f'Unlocking luks2 device: {self.luks_dev_path}')
- if '/' in mountpoint:
- os.path.basename(mountpoint) # TODO: Raise exception instead?
+ if not self.mapper_name:
+ raise ValueError('mapper name missing')
+
+ byte_password = self._password_bytes()
+
+ if not key_file:
+ if self.key_file:
+ key_file = self.key_file
+ else:
+ key_file = self._default_key_file()
+
+ with open(key_file, 'wb') as fh:
+ fh.write(byte_password)
wait_timer = time.time()
- while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10:
+ while Path(self.luks_dev_path).exists() is False and time.time() - wait_timer < 10:
time.sleep(0.025)
- SysCommand(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
- if os.path.islink(f'/dev/mapper/{mountpoint}'):
- self.mapdev = f'/dev/mapper/{mountpoint}'
-
- if (filesystem_type := get_filesystem_type(pathlib.Path(self.mapdev))) == 'btrfs':
- return BTRFSPartition(
- self.mapdev,
- block_device=MapperDev(mountpoint).partition.block_device,
- encrypted=True,
- filesystem=filesystem_type,
- autodetect_filesystem=False
- )
-
- return Partition(
- self.mapdev,
- block_device=MapperDev(mountpoint).partition.block_device,
- encrypted=True,
- filesystem=get_filesystem_type(self.mapdev),
- autodetect_filesystem=False
- )
-
- def close(self, mountpoint :Optional[str] = None) -> bool:
- if not mountpoint:
- mountpoint = self.mapdev
-
- SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
- return os.path.islink(self.mapdev) is False
-
- def format(self, path :str) -> None:
- if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
- raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')
-
- def add_key(self, path :pathlib.Path, password :str) -> bool:
- if not path.exists():
- raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path))
-
- log(f'Adding additional key-file {path} for {self.partition}', level=logging.INFO)
- worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}",
- environment_vars={'LC_ALL':'C'})
+ result = SysCommand(
+ '/usr/bin/cryptsetup open '
+ f'{self.luks_dev_path} '
+ f'{self.mapper_name} '
+ f'--key-file {key_file} '
+ f'--type luks2'
+ ).decode()
+
+ debug(f'cryptsetup open output: {result}')
+
+ if not self.mapper_dev or not self.mapper_dev.is_symlink():
+ raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')
+
+ def lock(self):
+ disk.device_handler.umount(self.luks_dev_path)
+
+ # Get crypt-information about the device by doing a reverse lookup starting with the partition path
+ # For instance: /dev/sda
+ lsblk_info = disk.get_lsblk_info(self.luks_dev_path)
+
+ # For each child (sub-partition/sub-device)
+ for child in lsblk_info.children:
+ # Unmount the child location
+ for mountpoint in child.mountpoints:
+ debug(f'Unmounting {mountpoint}')
+ disk.device_handler.umount(mountpoint, recursive=True)
+
+ # And close it if possible.
+ debug(f"Closing crypt device {child.name}")
+ SysCommand(f"cryptsetup close {child.name}")
+
+ self._mapper_dev = None
+
+ def create_keyfile(self, target_path: Path, override: bool = False):
+ """
+ Routine to create keyfiles, so it can be moved elsewhere
+ """
+ if self.mapper_name is None:
+ raise ValueError('Mapper name must be provided')
+
+ # Once we store the key as ../xyzloop.key systemd-cryptsetup can
+ # automatically load this key if we name the device to "xyzloop"
+ kf_path = Path(f'/etc/cryptsetup-keys.d/{self.mapper_name}.key')
+ key_file = target_path / kf_path.relative_to(kf_path.root)
+ crypttab_path = target_path / 'etc/crypttab'
+
+ if key_file.exists():
+ if not override:
+ info(f'Key file {key_file} already exists, keeping existing')
+ return
+ else:
+ info(f'Key file {key_file} already exists, overriding')
+
+ key_file.parent.mkdir(parents=True, exist_ok=True)
+
+ pwd = generate_password(length=512)
+ key_file.write_text(pwd)
+
+ key_file.chmod(0o400)
+
+ self._add_key(key_file)
+ self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"])
+
+ def _add_key(self, key_file: Path):
+ debug(f'Adding additional key-file {key_file}')
+
+ command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}'
+ worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'})
pw_injected = False
+
while worker.is_alive():
if b'Enter any existing passphrase' in worker and pw_injected is False:
- worker.write(bytes(password, 'UTF-8'))
+ worker.write(self._password_bytes())
pw_injected = True
if worker.exit_code != 0:
- raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}')
-
- return True
-
- def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None:
- log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO)
- with open(f"{installation.target}/etc/crypttab", "a") as crypttab:
- crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n")
+ raise DiskError(f'Could not add encryption key {key_file} to {self.luks_dev_path}: {worker.decode()}')
+
+ def _crypttab(
+ self,
+ crypttab_path: Path,
+ key_file: Path,
+ options: List[str]
+ ) -> None:
+ debug(f'Adding crypttab entry for key {key_file}')
+
+ with open(crypttab_path, 'a') as crypttab:
+ opt = ','.join(options)
+ uuid = self._get_luks_uuid()
+ row = f"{self.mapper_name} UUID={uuid} {key_file} {opt}\n"
+ crypttab.write(row)