Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk.py651
-rw-r--r--archinstall/lib/general.py12
-rw-r--r--archinstall/lib/installer.py25
-rw-r--r--archinstall/lib/luks.py7
-rw-r--r--archinstall/lib/storage.py3
-rw-r--r--archinstall/lib/user_interaction.py235
6 files changed, 788 insertions, 145 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index de39bafd..e8deb619 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -2,7 +2,6 @@ import glob
import pathlib
import re
import time
-from collections import OrderedDict
from typing import Optional
from .general import *
@@ -14,10 +13,218 @@ GPT = 0b00000001
MBR = 0b00000010
-# import ctypes
-# import ctypes.util
-# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
-# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
+def valid_parted_position(pos :str):
+ if not len(pos):
+ return False
+
+ if pos.isdigit():
+ return True
+
+ if pos[-1] == '%' and pos[:-1].isdigit():
+ return True
+
+ if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].replace(".", "", 1).isdigit():
+ return True
+
+ if pos[-2:].lower() in ['kb', 'mb', 'gb', 'tb'] and pos[:-2].replace(".", "", 1).isdigit():
+ return True
+
+ return False
+
+def valid_fs_type(fstype :str) -> bool:
+ # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
+ # Above link doesn't agree with `man parted` /mkpart documentation:
+ """
+ fs-type can
+ be one of "btrfs", "ext2",
+ "ext3", "ext4", "fat16",
+ "fat32", "hfs", "hfs+",
+ "linux-swap", "ntfs", "reis‐
+ erfs", "udf", or "xfs".
+ """
+
+ return fstype.lower() in [
+ "btrfs",
+ "ext2",
+ "ext3", "ext4", # `man parted` allows these
+ "fat16", "fat32",
+ "hfs", "hfs+", # "hfsx", not included in `man parted`
+ "linux-swap",
+ "ntfs",
+ "reiserfs",
+ "udf", # "ufs", not included in `man parted`
+ "xfs", # `man parted` allows this
+ ]
+
+
+def sort_block_devices_based_on_performance(block_devices):
+ result = {device: 0 for device in block_devices}
+
+ for device, weight in result.items():
+ if device.spinning:
+ weight -= 10
+ else:
+ weight += 5
+
+ if device.bus_type == 'nvme':
+ weight += 20
+ elif device.bus_type == 'sata':
+ weight += 10
+
+ result[device] = weight
+
+ return result
+
+def filter_disks_below_size_in_gb(devices, gigabytes):
+ for disk in devices:
+ if disk.size >= gigabytes:
+ yield disk
+
+def select_largest_device(devices, gigabytes, filter_out=None):
+ if not filter_out:
+ filter_out = []
+
+ copy_devices = [*devices]
+ for filter_device in filter_out:
+ if filter_device in copy_devices:
+ copy_devices.pop(copy_devices.index(filter_device))
+
+ copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes))
+
+ if not len(copy_devices):
+ return None
+
+ return max(copy_devices, key=(lambda device : device.size))
+
+def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
+ if not filter_out:
+ filter_out = []
+
+ copy_devices = [*devices]
+ for filter_device in filter_out:
+ if filter_device in copy_devices:
+ copy_devices.pop(copy_devices.index(filter_device))
+
+ if not len(copy_devices):
+ return None
+
+ return min(copy_devices, key=(lambda device : abs(device.size - gigabytes)))
+
+def suggest_single_disk_layout(block_device):
+ MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb
+
+ layout = {
+ block_device : {
+ "wipe" : True,
+ "partitions" : []
+ }
+ }
+
+ layout[block_device]['partitions'].append({
+ # Boot
+ "type" : "primary",
+ "start" : "1MiB",
+ "size" : "513MiB",
+ "boot" : True,
+ "encrypted" : False,
+ "format" : True,
+ "mountpoint" : "/boot",
+ "filesystem" : {
+ "format" : "fat32"
+ }
+ })
+ layout[block_device]['partitions'].append({
+ # Root
+ "type" : "primary",
+ "start" : "513MiB",
+ "encrypted" : False,
+ "format" : True,
+ "size" : "100%" if block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART else f"{min(block_device.size, 20)*1024}MiB",
+ "mountpoint" : "/",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ })
+
+ if block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART:
+ layout[block_device]['partitions'].append({
+ # Home
+ "type" : "primary",
+ "encrypted" : False,
+ "format" : True,
+ "start" : f"{min(block_device.size*0.2, 20)*1024}MiB",
+ "size" : "100%",
+ "mountpoint" : "/home",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ })
+
+ return layout
+
+
+def suggest_multi_disk_layout(block_devices):
+ MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb
+ ARCH_LINUX_INSTALLED_SIZE = 20 # Gb, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size?
+
+ block_devices = sort_block_devices_based_on_performance(block_devices).keys()
+
+ home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART)
+ root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device])
+
+ log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG)
+
+ layout = {
+ root_device : {
+ "wipe" : True,
+ "partitions" : []
+ },
+ home_device : {
+ "wipe" : True,
+ "partitions" : []
+ },
+ }
+
+ layout[root_device]['partitions'].append({
+ # Boot
+ "type" : "primary",
+ "start" : "1MiB",
+ "size" : "513MiB",
+ "boot" : True,
+ "encrypted" : False,
+ "format" : True,
+ "mountpoint" : "/boot",
+ "filesystem" : {
+ "format" : "fat32"
+ }
+ })
+ layout[root_device]['partitions'].append({
+ # Root
+ "type" : "primary",
+ "start" : "513MiB",
+ "encrypted" : False,
+ "format" : True,
+ "size" : "100%",
+ "mountpoint" : "/",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ })
+
+ layout[home_device]['partitions'].append({
+ # Home
+ "type" : "primary",
+ "encrypted" : False,
+ "format" : True,
+ "start" : "4MiB",
+ "size" : "100%",
+ "mountpoint" : "/home",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ })
+
+ return layout
class BlockDevice:
@@ -30,14 +237,14 @@ class BlockDevice:
self.path = path
self.info = info
self.keep_partitions = True
- self.part_cache = OrderedDict()
+ self.part_cache = {}
+
# TODO: Currently disk encryption is a BIT misleading.
# It's actually partition-encryption, but for future-proofing this
# I'm placing the encryption password on a BlockDevice level.
- self.encryption_password = None
def __repr__(self, *args, **kwargs):
- return f"BlockDevice({self.device})"
+ return f"BlockDevice({self.device}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
def __iter__(self):
for partition in self.partitions:
@@ -48,25 +255,36 @@ class BlockDevice:
raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key]
+ def __len__(self):
+ return len(self.partitions)
+
+ def __lt__(self, left_comparitor):
+ return self.path < left_comparitor.path
+
def json(self):
"""
json() has precedence over __dump__, so this is a way
to give less/partial information for user readability.
"""
- return {
- 'path': self.path,
- 'size': self.info['size'] if 'size' in self.info else '<unknown>',
- 'model': self.info['model'] if 'model' in self.info else '<unknown>'
- }
+ return self.path
def __dump__(self):
return {
- 'path': self.path,
- 'info': self.info,
- 'partition_cache': self.part_cache
+ self.path : {
+ 'partuuid' : self.uuid,
+ 'wipe' : self.info.get('wipe', None),
+ 'partitions' : [part.__dump__() for part in self.partitions.values()]
+ }
}
@property
+ def partition_type(self):
+ output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return device['pttype']
+
+ @property
def device(self):
"""
Returns the actual device-endpoint of the BlockDevice.
@@ -78,7 +296,7 @@ class BlockDevice:
raise DiskError(f'Could not locate backplane info for "{self.path}"')
if self.info['type'] == 'loop':
- for drive in json.loads(b''.join(SysCommand(['losetup', '--json'])).decode('UTF_8'))['loopdevices']:
+ for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
if not drive['name'] == self.path:
continue
@@ -100,18 +318,17 @@ class BlockDevice:
@property
def partitions(self):
- o = b''.join(SysCommand(['partprobe', self.path]))
+ SysCommand(['partprobe', self.path])
- # o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
- o = b''.join(SysCommand(['/usr/bin/lsblk', '-J', self.path]))
+ result = SysCommand(['/usr/bin/lsblk', '-J', self.path])
- if b'not a block device' in o:
+ if b'not a block device' in result:
raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}')
- if not o[:1] == b'{':
+ if not result[:1] == b'{':
raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}')
- r = json.loads(o.decode('UTF-8'))
+ r = json.loads(result.decode('UTF-8'))
if len(r['blockdevices']) and 'children' in r['blockdevices'][0]:
root_path = f"/dev/{r['blockdevices'][0]['name']}"
for part in r['blockdevices'][0]['children']:
@@ -140,10 +357,65 @@ class BlockDevice:
This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions.
"""
- lsblk = b''.join(SysCommand(f'lsblk -J -o+UUID {self.path}'))
- for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
+ for partition in json.loads(SysCommand(f'lsblk -J -o+UUID {self.path}').decode('UTF-8'))['blockdevices']:
return partition.get('uuid', None)
+ def convert_size_to_gb(self, size):
+ units = {
+ 'P' : lambda s : float(s) * 2048,
+ 'T' : lambda s : float(s) * 1024,
+ 'G' : lambda s : float(s),
+ 'M' : lambda s : float(s) / 1024,
+ 'K' : lambda s : float(s) / 2048,
+ 'B' : lambda s : float(s) / 3072,
+ }
+ unit = size[-1]
+ return float(units.get(unit, lambda s : None)(size[:-1]))
+
+ @property
+ def size(self):
+ output = json.loads(SysCommand(f"lsblk --json -o+SIZE {self.path}").decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return self.convert_size_to_gb(device['size'])
+
+ @property
+ def bus_type(self):
+ output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return device['tran']
+
+ @property
+ def spinning(self):
+ output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return device['rota'] is True
+
+ @property
+ def free_space(self):
+ # NOTE: parted -s will default to `cancel` on prompt, skipping any partition
+ # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
+ # so the free will ignore the ESP partition and just give the "free" space.
+ # Doesn't harm us, but worth noting in case something weird happens.
+ for line in SysCommand(f"parted -s --machine {self.path} print free"):
+ if 'free' in (free_space := line.decode('UTF-8')):
+ _, start, end, size, *_ = free_space.strip('\r\n;').split(':')
+ yield (start, end, size)
+
+ @property
+ def largest_free_space(self):
+ info = None
+ for space_info in self.free_space:
+ if not info:
+ info = space_info
+ else:
+ # [-1] = size
+ if space_info[-1] > info[-1]:
+ info = space_info
+ return info
+
def has_partitions(self):
return len(self.partitions)
@@ -154,7 +426,12 @@ class BlockDevice:
return False
def flush_cache(self):
- self.part_cache = OrderedDict()
+ self.part_cache = {}
+
+ def get_partition(self, uuid):
+ for partition in self:
+ if partition.uuid == uuid:
+ return partition
class Partition:
@@ -171,7 +448,7 @@ class Partition:
self.size = size # TODO: Refresh?
self._encrypted = None
self.encrypted = encrypted
- self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions.
+ self.allow_formatting = False
if mountpoint:
self.mount(mountpoint)
@@ -206,9 +483,80 @@ class Partition:
mount_repr = f", rel_mountpoint={self.target_mountpoint}"
if self._encrypted:
- return f'Partition(path={self.path}, size={self.size}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
+ return f'Partition(path={self.path}, size={self.size}, PARTUUID={self.uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
else:
- return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})'
+ return f'Partition(path={self.path}, size={self.size}, PARTUUID={self.uuid}, fs={self.filesystem}{mount_repr})'
+
+ def __dump__(self):
+ return {
+ 'type' : 'primary',
+ 'PARTUUID' : self.uuid,
+ 'wipe' : self.allow_formatting,
+ 'boot' : self.boot,
+ 'ESP' : self.boot,
+ 'mountpoint' : self.target_mountpoint,
+ 'encrypted' : self._encrypted,
+ 'start' : self.start,
+ 'size' : self.end,
+ 'filesystem' : {
+ 'format' : get_filesystem_type(self.path)
+ }
+ }
+
+ @property
+ def sector_size(self):
+ output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.path}").decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return device.get('log-sec', None)
+
+ @property
+ def start(self):
+ output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+
+ for partition in output.get('partitiontable', {}).get('partitions', []):
+ if partition['node'] == self.path:
+ return partition['start']# * self.sector_size
+
+ @property
+ def end(self):
+ # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
+ output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+
+ for partition in output.get('partitiontable', {}).get('partitions', []):
+ if partition['node'] == self.path:
+ return partition['size']# * self.sector_size
+
+ @property
+ def boot(self):
+ output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+
+ # Get the bootable flag from the sfdisk output:
+ # {
+ # "partitiontable": {
+ # "label":"dos",
+ # "id":"0xd202c10a",
+ # "device":"/dev/loop0",
+ # "unit":"sectors",
+ # "sectorsize":512,
+ # "partitions": [
+ # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true}
+ # ]
+ # }
+ # }
+
+ for partition in output.get('partitiontable', {}).get('partitions', []):
+ if partition['node'] == self.path:
+ return partition.get('bootable', False)
+
+ return False
+
+ @property
+ def partition_type(self):
+ lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
+
+ for device in lsblk['blockdevices']:
+ return device['pttype']
@property
def uuid(self) -> Optional[str]:
@@ -217,8 +565,9 @@ class Partition:
This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions.
"""
- lsblk = b''.join(SysCommand(f'lsblk -J -o+PARTUUID {self.path}'))
- for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
+
+ lsblk = json.loads(SysCommand(f'lsblk -J -o+PARTUUID {self.path}').decode('UTF-8'))
+ for partition in lsblk['blockdevices']:
return partition.get('partuuid', None)
return None
@@ -237,7 +586,7 @@ class Partition:
@property
def real_device(self):
- for blockdevice in json.loads(b''.join(SysCommand('lsblk -J')).decode('UTF-8'))['blockdevices']:
+ for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)):
return f"/dev/{parent}"
# raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
@@ -248,7 +597,7 @@ class Partition:
from .luks import luks2
try:
- with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device:
+ with luks2(self, storage.get('ENC_IDENTIFIER', 'ai')+'loop', password, auto_unmount=True) as unlocked_device:
return unlocked_device.filesystem
except SysCallError:
return None
@@ -274,39 +623,16 @@ class Partition:
return True if files > 0 else False
- def safe_to_format(self):
- if self.allow_formatting is False:
- log(f"Partition {self} is not marked for formatting.", level=logging.DEBUG)
- return False
- elif self.target_mountpoint == '/boot':
- try:
- if self.has_content():
- log(f"Partition {self} is a boot partition and has content inside.", level=logging.DEBUG)
- return False
- except SysCallError as err:
- log(err.message, logging.DEBUG)
- log(f"Partition {self} was identified as /boot but we could not mount to check for content, continuing!", level=logging.DEBUG)
- pass
-
- return True
-
def encrypt(self, *args, **kwargs):
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
from .luks import luks2
- if not self._encrypted:
- raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}")
-
- if not self.safe_to_format():
- log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=logging.ERROR, fg="red")
- return False
-
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem=None, path=None, allow_formatting=None, log_formatting=True):
+ def format(self, filesystem=None, path=None, log_formatting=True):
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@@ -316,52 +642,46 @@ class Partition:
if path is None:
path = self.path
- if allow_formatting is None:
- allow_formatting = self.allow_formatting
# To avoid "unable to open /dev/x: No such file or directory"
start_wait = time.time()
while pathlib.Path(path).exists() is False and time.time() - start_wait < 10:
time.sleep(0.025)
- if not allow_formatting:
- raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
-
if log_formatting:
log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
if filesystem == 'btrfs':
- o = b''.join(SysCommand(f'/usr/bin/mkfs.btrfs -f {path}'))
- if b'UUID' not in o:
- raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
- self.filesystem = 'btrfs'
+ if 'UUID:' not in (mkfs := SysCommand(f'/usr/bin/mkfs.btrfs -f {path}').decode('UTF-8')):
+ raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
+ self.filesystem = filesystem
- elif filesystem == 'vfat':
- o = b''.join(SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}'))
- if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
- raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
- self.filesystem = 'vfat'
+ elif filesystem == 'fat32':
+ mkfs = SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}').decode('UTF-8')
+ if ('mkfs.fat' not in mkfs and 'mkfs.vfat' not in mkfs) or 'command not found' in mkfs:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}")
+ self.filesystem = filesystem
elif filesystem == 'ext4':
if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'ext4'
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
elif filesystem == 'xfs':
if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'xfs'
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
elif filesystem == 'f2fs':
if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'f2fs'
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
elif filesystem == 'crypto_LUKS':
# from .luks import luks2
# encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path)
- self.filesystem = 'crypto_LUKS'
+ self.filesystem = filesystem
else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
@@ -393,9 +713,9 @@ class Partition:
try:
if options:
- SysCommand(f'/usr/bin/mount -o {options} {self.path} {target}')
+ SysCommand(f"/usr/bin/mount -o {options} {self.path} {target}")
else:
- SysCommand(f'/usr/bin/mount {self.path} {target}')
+ SysCommand(f"/usr/bin/mount {self.path} {target}")
except SysCallError as err:
raise err
@@ -404,7 +724,7 @@ class Partition:
def unmount(self):
try:
- exit_code = SysCommand(f'/usr/bin/umount {self.path}').exit_code
+ SysCommand(f"/usr/bin/umount {self.path}")
except SysCallError as err:
exit_code = err.exit_code
@@ -476,17 +796,89 @@ class Filesystem:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
- b''.join(SysCommand('sync'))
+ SysCommand('sync')
return True
+ def partuuid_to_index(self, uuid):
+ output = json.loads(SysCommand(f"lsblk --json -o+PARTUUID {self.blockdevice.device}").decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ for index, partition in enumerate(device['children']):
+ if partition['partuuid'].lower() == uuid:
+ return index
+
+ def load_layout(self, layout :dict):
+ from .luks import luks2
+
+ # If the layout tells us to wipe the drive, we do so
+ if layout.get('wipe', False):
+ if self.mode == GPT:
+ if not self.parted_mklabel(self.blockdevice.device, "gpt"):
+ raise KeyError(f"Could not create a GPT label on {self}")
+ elif self.mode == MBR:
+ if not self.parted_mklabel(self.blockdevice.device, "msdos"):
+ raise KeyError(f"Could not create a MSDOS label on {self}")
+
+ # We then iterate the partitions in order
+ for partition in layout.get('partitions', []):
+ # We don't want to re-add an existing partition (those containing a UUID already)
+ if partition.get('format', False) and not partition.get('PARTUUID', None):
+ print("Adding partition....")
+ partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
+ start=partition.get('start', '1MiB'), # TODO: Revisit sane block starts (4MB for memorycards for instance)
+ end=partition.get('size', '100%'),
+ partition_format=partition.get('filesystem', {}).get('format', 'btrfs'))
+ # TODO: device_instance some times become None
+ # print('Device instance:', partition['device_instance'])
+
+ elif (partition_uuid := partition.get('PARTUUID')) and (partition_instance := self.blockdevice.get_partition(uuid=partition_uuid)):
+ print("Re-using partition_instance:", partition_instance)
+ partition['device_instance'] = partition_instance
+ else:
+ raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition_uuid)}).")
+
+ if partition.get('filesystem', {}).get('format', False):
+ if partition.get('encrypted', False):
+ if not partition.get('password'):
+ if storage['arguments'] == 'silent':
+ raise ValueError(f"Missing encryption password for {partition['device_instance']}")
+ else:
+ from .user_interaction import get_password
+ partition['password'] = get_password(f"Enter a encryption password for {partition['device_instance']}")
+
+ partition['device_instance'].encrypt(password=partition['password'])
+ with luks2(partition['device_instance'], storage.get('ENC_IDENTIFIER', 'ai')+'loop', partition['password']) as unlocked_device:
+ if not partition.get('format'):
+ if storage['arguments'] == 'silent':
+ raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}")
+ else:
+ if not partition.get('filesystem'):
+ partition['filesystem'] = {}
+
+ if not partition['filesystem'].get('format', False):
+ while True:
+ partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
+ if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
+ pint("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")
+ continue
+ break
+
+ unlocked_device.format(partition['filesystem']['format'])
+ elif partition.get('format', False):
+ partition['device_instance'].format(partition['filesystem']['format'])
+
+ if partition.get('boot', False):
+ self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
+
def find_partition(self, mountpoint):
for partition in self.blockdevice:
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
return partition
def raw_parted(self, string: str):
- x = SysCommand(f'/usr/bin/parted -s {string}')
- return x
+ if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0:
+ log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red")
+ return cmd_handle
def parted(self, string: str):
"""
@@ -495,64 +887,47 @@ class Filesystem:
:param string: A raw string passed to /usr/bin/parted -s <string>
:type string: str
"""
- return self.raw_parted(string).exit_code
-
- def use_entire_disk(self, root_filesystem_type='ext4'):
- log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG)
- if has_uefi():
- self.add_partition('primary', start='1MiB', end='513MiB', partition_format='fat32')
- self.set_name(0, 'EFI')
- self.set(0, 'boot on')
- # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
- # https://www.gnu.org/software/parted/manual/html_node/set.html
- self.set(0, 'esp on')
- self.add_partition('primary', start='513MiB', end='100%')
-
- self.blockdevice.partition[0].filesystem = 'vfat'
- self.blockdevice.partition[1].filesystem = root_filesystem_type
- log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
-
- self.blockdevice.partition[0].target_mountpoint = '/boot'
- self.blockdevice.partition[1].target_mountpoint = '/'
-
- self.blockdevice.partition[0].allow_formatting = True
- self.blockdevice.partition[1].allow_formatting = True
- else:
- # we don't need a seprate boot partition it would be a waste of space
- self.add_partition('primary', start='1MB', end='100%')
- self.blockdevice.partition[0].filesystem = root_filesystem_type
- log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
- self.blockdevice.partition[0].target_mountpoint = '/'
- self.blockdevice.partition[0].allow_formatting = True
+ return self.raw_parted(string).exit_code == 0
+
+ def use_entire_disk(self, root_filesystem_type='ext4') -> Partition:
+ # TODO: Implement this with declarative profiles instead.
+ raise ValueError("Installation().use_entire_disk() has to be re-worked.")
def add_partition(self, partition_type, start, end, partition_format=None):
- log(f'Adding partition to {self.blockdevice}', level=logging.INFO)
+ log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
+
+ previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
- previous_partitions = self.blockdevice.partitions
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions")
+
if partition_format:
- partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}') == 0
+ parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}'
else:
- partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {start} {end}') == 0
+ parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}'
- if partitioning:
+ if self.parted(parted_string):
start_wait = time.time()
- while previous_partitions == self.blockdevice.partitions:
- time.sleep(0.025) # Let the new partition come up in the kernel
+ while previous_partition_uuids == {partition.uuid for partition in self.blockdevice.partitions.values()}:
if time.time() - start_wait > 10:
raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).")
+ time.sleep(0.025)
+
+
+ time.sleep(0.5) # Let the kernel catch up with quick block devices (nvme for instance)
+ return self.blockdevice.get_partition(uuid=(previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()}).pop())
- return True
def set_name(self, partition: int, name: str):
return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
def set(self, partition: int, string: str):
+ log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
def parted_mklabel(self, device: str, disk_label: str):
+ log(f"Creating a new partition labling on {device}", level=logging.INFO, fg="yellow")
# Try to unmount devices before attempting to run mklabel
try:
SysCommand(f'bash -c "umount {device}?"')
@@ -582,13 +957,15 @@ def device_state(name, *args, **kwargs):
# lsblk --json -l -n -o path
def all_disks(*args, **kwargs):
kwargs.setdefault("partitions", False)
- drives = OrderedDict()
- # for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
- for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model')).decode('UTF_8'))['blockdevices']:
+ drives = {}
+
+ lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8'))
+ for drive in lsblk['blockdevices']:
if not kwargs['partitions'] and drive['type'] == 'part':
continue
drives[drive['path']] = BlockDevice(drive['path'], drive)
+
return drives
@@ -617,12 +994,10 @@ def harddrive(size=None, model=None, fuzzy=False):
def get_mount_info(path) -> dict:
try:
- output = SysCommand(f'/usr/bin/findmnt --json {path}')
+ output = SysCommand(f'/usr/bin/findmnt --json {path}').decode('UTF-8')
except SysCallError:
return {}
- output = output.decode('UTF-8')
-
if not output:
return {}
@@ -636,14 +1011,12 @@ def get_mount_info(path) -> dict:
def get_partitions_in_use(mountpoint) -> list:
try:
- output = SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}')
+ output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
except SysCallError:
return []
mounts = []
- output = output.decode('UTF-8')
-
if not output:
return []
@@ -659,16 +1032,26 @@ def get_partitions_in_use(mountpoint) -> list:
def get_filesystem_type(path):
try:
- handle = SysCommand(f"blkid -o value -s TYPE {path}")
- return b''.join(handle).strip().decode('UTF-8')
+ return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip()
except SysCallError:
return None
def disk_layouts():
try:
- handle = SysCommand("lsblk -f -o+TYPE,SIZE -J")
- return json.loads(b''.join(handle).decode('UTF-8'))
+ return json.loads(SysCommand("lsblk -f -o+TYPE,SIZE -J").decode('UTF-8'))
except SysCallError as err:
log(f"Could not return disk layouts: {err}")
return None
+
+
+def encrypted_partitions(blockdevices :dict) -> bool:
+ for partition in blockdevices.values():
+ if partition.get('encrypted', False):
+ yield partition
+
+def find_partition_by_mountpoint(block_devices, relative_mountpoint :str):
+ for device in block_devices:
+ for partition in block_devices[device]['partitions']:
+ if partition.get('mountpoint', None) == relative_mountpoint:
+ return partition \ No newline at end of file
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index b9dc66ab..f0be7972 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -71,6 +71,8 @@ def locate_binary(name):
raise RequirementError(f"Binary {name} does not exist.")
+def json_dumps(*args, **kwargs):
+ return json.dumps(*args, **{**kwargs, 'cls': JSON})
class JsonEncoder:
def _encode(obj):
@@ -113,7 +115,6 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
def encode(self, obj):
return super(JSON, self).encode(self._encode(obj))
-
class SysCommandWorker:
def __init__(self, cmd, callbacks=None, peak_output=False, environment_vars=None, logfile=None, working_directory='./'):
if not callbacks:
@@ -321,6 +322,15 @@ class SysCommand:
for line in self.session:
yield line
+ def __getitem__(self, key):
+ if type(key) is slice:
+ start = key.start if key.start else 0
+ end = key.stop if key.stop else len(self.session._trace_log)
+
+ return self.session._trace_log[start:end]
+ else:
+ raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.")
+
def __repr__(self, *args, **kwargs):
return self.session._trace_log.decode('UTF-8')
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index da6f6a9b..00d3a001 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -57,7 +57,6 @@ class Installer:
self.post_base_install = []
storage['session'] = self
- self.partitions = get_partitions_in_use(self.target)
self.MODULES = []
self.BINARIES = []
@@ -108,6 +107,10 @@ class Installer:
self.sync_log_to_install_medium()
return False
+ @property
+ def partitions(self):
+ return get_partitions_in_use(self.target)
+
def sync_log_to_install_medium(self):
# Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
@@ -122,6 +125,23 @@ class Installer:
return True
+ def mount_ordered_layout(self, layouts :dict):
+ from .luks import luks2
+
+ mountpoints = {}
+ for blockdevice in layouts:
+ for partition in layouts[blockdevice]['partitions']:
+ mountpoints[partition['mountpoint']] = partition
+
+ for mountpoint in sorted(mountpoints.keys()):
+ if mountpoints[mountpoint]['encrypted']:
+ loopdev = storage.get('ENC_IDENTIFIER', 'ai')+'loop'
+ password = mountpoints[mountpoint]['password']
+ with luks2(mountpoints[mountpoint]['device_instance'], loopdev, password, auto_unmount=False) as unlocked_device:
+ unlocked_device.mount(f"{self.target}{mountpoint}")
+ else:
+ mountpoints[mountpoint]['device_instance'].mount(f"{self.target}{mountpoint}")
+
def mount(self, partition, mountpoint, create_mountpoint=True):
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
@@ -425,6 +445,9 @@ class Installer:
elif partition.mountpoint == self.target:
root_partition = partition
+ if boot_partition is None and root_partition is None:
+ raise ValueError(f"Could not detect root (/) or boot (/boot) in {self.target} based on: {self.partitions}")
+
self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO)
if bootloader == 'systemd-bootctl':
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index b910bfb2..781bed43 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -18,9 +18,6 @@ class luks2:
self.mapdev = None
def __enter__(self):
- # if self.partition.allow_formatting:
- # self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
- # else:
if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
@@ -42,9 +39,6 @@ class luks2:
return True
def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
- if not self.partition.allow_formatting:
- raise DiskError(f'Could not encrypt volume {partition} due to it having a formatting lock.')
-
log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
if not key_file:
@@ -132,7 +126,6 @@ class luks2:
if os.path.islink(f'/dev/mapper/{mountpoint}'):
self.mapdev = f'/dev/mapper/{mountpoint}'
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
- unlocked_partition.allow_formatting = self.partition.allow_formatting
return unlocked_partition
def close(self, mountpoint=None):
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index 4e19e4d4..67f8e716 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -18,5 +18,6 @@ storage = {
'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
'LOG_PATH': '/var/log/archinstall',
'LOG_FILE': 'install.log',
- 'MOUNT_POINT': '/mnt',
+ 'MOUNT_POINT': '/mnt/archinstall',
+ 'ENC_IDENTIFIER': 'ainst'
}
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 3c1ba04f..dadc111f 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -9,6 +9,7 @@ import signal
import sys
import time
+from .disk import BlockDevice, valid_fs_type, find_partition_by_mountpoint, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position
from .exceptions import *
from .general import SysCommand
from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics
@@ -119,7 +120,7 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
def generic_multi_select(options, text="Select one or more of the options above (leave blank to continue): ", sort=True, default=None, allow_empty=False):
# Checking if the options are different from `list` or `dict` or if they are empty
- if type(options) not in [list, dict]:
+ if type(options) not in [list, dict, type({}.keys()), type({}.values())]:
log(f" * Generic multi-select doesn't support ({type(options)}) as type of options * ", fg='red')
log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
raise RequirementError("generic_multi_select() requires list or dictionary as options.")
@@ -130,6 +131,8 @@ def generic_multi_select(options, text="Select one or more of the options above
# After passing the checks, function continues to work
if type(options) == dict:
options = list(options.values())
+ elif type(options) in (type({}.keys()), type({}.values())):
+ options = list(options)
if sort:
options = sorted(options)
@@ -186,8 +189,23 @@ def generic_multi_select(options, text="Select one or more of the options above
except RequirementError as e:
log(f" * {e} * ", fg='red')
+ sys.stdout.write('\n')
+ sys.stdout.flush()
return selected_options
+def select_encrypted_partitions(block_devices :dict, password :str) -> dict:
+ root = find_partition_by_mountpoint(block_devices, '/')
+ root['encrypted'] = True
+ root['password'] = password
+
+ return block_devices
+
+ # TODO: Next version perhaps we can support multiple encrypted partitions
+ #options = []
+ #for partition in block_devices.values():
+ # options.append({key: val for key, val in partition.items() if val})
+
+ #print(generic_multi_select(options, f"Choose which partitions to encrypt (leave blank when done): "))
class MiniCurses:
def __init__(self, width, height):
@@ -536,6 +554,221 @@ def generic_select(options, input_text="Select one of the above by index or abso
return selected_option
+def partition_overlap(partitions :list, start :str, end :str) -> bool:
+ # TODO: Implement sanity check
+ return False
+
+def get_default_partition_layout(block_devices):
+ if len(block_devices) == 1:
+ return suggest_single_disk_layout(block_devices[0])
+ else:
+ return suggest_multi_disk_layout(block_devices)
+
+ # TODO: Implement sane generic layout for 2+ drives
+
+def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict:
+ if has_uefi():
+ partition_type = 'gpt'
+ else:
+ partition_type = 'msdos'
+
+ # log(f"Selecting which partitions to re-use on {block_device}...", fg="yellow", level=logging.INFO)
+ # partitions = generic_multi_select(block_device.partitions.values(), "Select which partitions to re-use (the rest will be left alone): ", sort=True)
+ # partitions_to_wipe = generic_multi_select(partitions, "Which partitions do you wish to wipe (multiple can be selected): ", sort=True)
+
+ # mountpoints = {}
+ # struct = {
+ # "partitions" : []
+ # }
+ # for partition in partitions:
+ # mountpoint = input(f"Select a mountpoint (or skip) for {partition}: ").strip()
+
+ # part_struct = {}
+ # if mountpoint:
+ # part_struct['mountpoint'] = mountpoint
+ # if mountpoint == '/boot':
+ # part_struct['boot'] = True
+ # if has_uefi():
+ # part_struct['ESP'] = True
+ # elif mountpoint == '/' and
+ # if partition.uuid:
+ # part_struct['PARTUUID'] = partition.uuid
+ # if partition in partitions_to_wipe:
+ # part_struct['wipe'] = True
+
+ # struct['partitions'].append(part_struct)
+
+ # return struct
+
+ mountpoints = {}
+ block_device_struct = {
+ "partitions" : [partition.__dump__() for partition in block_device.partitions.values()]
+ }
+ # Test code: [part.__dump__() for part in block_device.partitions.values()]
+ # TODO: Squeeze in BTRFS subvolumes here
+
+ while True:
+ modes = [
+ "Create a new partition",
+ f"Suggest partition layout for {block_device}",
+ "Delete a partition" if len(block_device_struct) else "",
+ "Clear/Delete all partitions" if len(block_device_struct) else "",
+ "Assign mount-point for a partition" if len(block_device_struct) else "",
+ "Mark/Unmark a partition to be formatted (wipes data)" if len(block_device_struct) else "",
+ "Mark/Unmark a partition as encrypted" if len(block_device_struct) else "",
+ "Mark/Unmark a partition as bootable (automatic for /boot)" if len(block_device_struct) else "",
+ "Set desired filesystem for a partition" if len(block_device_struct) else "",
+ ]
+
+ # Print current partition layout:
+ if len(block_device_struct["partitions"]):
+ print('Current partition layout:')
+ for partition in block_device_struct["partitions"]:
+ print(partition)
+ print()
+
+ task = generic_select(modes,
+ input_text=f"Select what to do with {block_device} (leave blank when done): ")
+
+ if not task:
+ break
+
+ if task == 'Create a new partition':
+ if partition_type == 'gpt':
+ # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
+ # https://www.gnu.org/software/parted/manual/html_node/mklabel.html
+ name = input("Enter a desired name for the partition: ").strip()
+
+ fstype = input("Enter a desired filesystem type for the partition: ").strip()
+
+ start = input(f"Enter the start sector (percentage or block number, default: {block_device.largest_free_space[0]}): ").strip()
+ if not start.strip():
+ start = block_device.largest_free_space[0]
+ end_suggested = block_device.largest_free_space[1]
+ else:
+ end_suggested = '100%'
+ end = input(f"Enter the end sector of the partition (percentage or block number, ex: {end_suggested}): ").strip()
+ if not end.strip():
+ end = end_suggested
+
+ if valid_parted_position(start) and valid_parted_position(end) and valid_fs_type(fstype):
+ if partition_overlap(block_device_struct["partitions"], start, end):
+ log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", fg="red")
+ continue
+
+ block_device_struct["partitions"].append({
+ "type" : "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject
+ "start" : start,
+ "size" : end,
+ "mountpoint" : None,
+ "wipe" : True,
+ "filesystem" : {
+ "format" : fstype
+ }
+ })
+ else:
+ log(f"Invalid start ({valid_parted_position(start)}), end ({valid_parted_position(end)}) or fstype ({valid_fs_type(fstype)}) for this partition. Ignoring this partition creation.", fg="red")
+ continue
+ elif task[:len("Suggest partition layout")] == "Suggest partition layout":
+ if len(block_device_struct["partitions"]):
+ if input(f"{block_device} contains queued partitions, this will remove those, are you sure? y/N: ").strip().lower() in ('', 'n'):
+ continue
+
+ block_device_struct["partitions"] = suggest_single_disk_layout(block_device)[block_device]
+ elif task is None:
+ return block_device_struct
+ else:
+ for index, partition in enumerate(block_device_struct["partitions"]):
+ print(f"{index}: Start: {partition['start']}, End: {partition['size']} ({partition['filesystem']['format']}{', mounting at: '+partition['mountpoint'] if partition['mountpoint'] else ''})")
+
+ if task == "Delete a partition":
+ if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to delete: ', options_output=False)):
+ del(block_device_struct["partitions"][block_device_struct["partitions"].index(partition)])
+ elif task == "Clear/Delete all partitions":
+ block_device_struct["partitions"] = []
+ elif task == "Assign mount-point for a partition":
+ if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mount where: ', options_output=False)):
+ print(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')
+ mountpoint = input('Select where to mount partition (leave blank to remove mountpoint): ').strip()
+
+ if len(mountpoint):
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['mountpoint'] = mountpoint
+ if mountpoint == '/boot':
+ log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow")
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['boot'] = True
+ else:
+ del(block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['mountpoint'])
+
+ elif task == "Mark/Unmark a partition to be formatted (wipes data)":
+ if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mask for formatting: ', options_output=False)):
+ # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
+ # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
+ # it's safe to change the filesystem for this partition.
+ if block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('filesystem', {}).get('format', 'crypto_LUKS') == 'crypto_LUKS':
+ if not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('filesystem', None):
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem'] = {}
+
+ while True:
+ fstype = input("Enter a desired filesystem type for the partition: ").strip()
+ if not valid_fs_type(fstype):
+ log(f"Desired filesystem {fstype} is not a valid filesystem.", level=logging.ERROR, fg="red")
+ continue
+ break
+
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem']['format'] = fstype
+
+ # Negate the current wipe marking
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['format'] = not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('format', False)
+
+ elif task == "Mark/Unmark a partition as encrypted":
+ if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mark as encrypted: ', options_output=False)):
+ # Negate the current encryption marking
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['encrypted'] = not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('encrypted', False)
+
+ elif task == "Mark/Unmark a partition as bootable (automatic for /boot)":
+ if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mark as bootable: ', options_output=False)):
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['boot'] = not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('boot', False)
+
+ elif task == "Set desired filesystem for a partition":
+ if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to set a filesystem on: ', options_output=False)):
+ if not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('filesystem', None):
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem'] = {}
+
+ while True:
+ fstype = input("Enter a desired filesystem type for the partition: ").strip()
+ if not valid_fs_type(fstype):
+ log(f"Desired filesystem {fstype} is not a valid filesystem.", level=logging.ERROR, fg="red")
+ continue
+ break
+
+ block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem']['format'] = fstype
+
+ return block_device_struct
+
+def select_individual_blockdevice_usage(block_devices :list):
+ result = {}
+
+ for device in block_devices:
+ layout = manage_new_and_existing_partitions(device)
+
+ result[device] = layout
+
+ return result
+
+
+def select_disk_layout(block_devices :list):
+ modes = [
+ "Wipe all selected drives and use a best-effort default partition layout",
+ "Select what to do with each individual drive (followed by partition usage)"
+ ]
+
+ mode = generic_select(modes, input_text=f"Select what you wish to do with the selected block devices: ")
+
+ if mode == 'Wipe all selected drives and use a best-effort default partition layout':
+ return get_default_partition_layout(block_devices)
+ else:
+ return select_individual_blockdevice_usage(block_devices)
+
def select_disk(dict_o_disks):
"""