from __future__ import annotations from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable from ..menu import Menu from ..output import log from ..disk.validators import fs_types from ..disk.helpers import has_mountpoint if TYPE_CHECKING: from ..disk import BlockDevice from ..disk.partition import Partition _: Any def partition_overlap(partitions: list, start: str, end: str) -> bool: # TODO: Implement sanity check return False def _current_partition_layout(partitions: List[Partition], with_idx: bool = False) -> str: def do_padding(name, max_len): spaces = abs(len(str(name)) - max_len) + 2 pad_left = int(spaces / 2) pad_right = spaces - pad_left return f'{pad_right * " "}{name}{pad_left * " "}|' column_names = {} # this will add an initial index to the table for each partition if with_idx: column_names['index'] = max([len(str(len(partitions))), len('index')]) # determine all attribute names and the max length # of the value among all partitions to know the width # of the table cells for p in partitions: for attribute, value in p.items(): if attribute in column_names.keys(): column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)]) else: column_names[attribute] = max([len(str(value)), len(attribute)]) current_layout = '' for name, max_len in column_names.items(): current_layout += do_padding(name, max_len) current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n' for idx, p in enumerate(partitions): row = '' for name, max_len in column_names.items(): if name == 'index': row += do_padding(str(idx), max_len) elif name in p: row += do_padding(p[name], max_len) else: row += ' ' * (max_len + 2) + '|' current_layout += f'{row[:-1]}\n' title = str(_('Current partition layout')) return f'\n\n{title}:\n\n{current_layout}' def select_partition(title :str, partitions :List[Partition], multiple :bool = False, filter :Callable = None) -> Union[int, List[int], None]: """ filter allows to filter out the indexes once they are set. Should return True if element is to be included """ partition_indexes = [] for i in range(len(partitions)): if filter: if filter(partitions[i]): partition_indexes.append(str(i)) else: partition_indexes.append(str(i)) if len(partition_indexes) == 0: return None # old code without filter # partition_indexes = list(map(str, range(len(partitions)))) partition = Menu(title, partition_indexes, multi=multiple).run() if partition is not None: if isinstance(partition, list): return [int(p) for p in partition] else: return int(partition) return None def get_default_partition_layout(block_devices: Union['BlockDevice', List['BlockDevice']], advanced_options: bool = False) -> Dict[str, Any]: from ..disk import suggest_single_disk_layout, suggest_multi_disk_layout if len(block_devices) == 1: return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options) else: return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: result = {} for device in block_devices: layout = manage_new_and_existing_partitions(device) result[device.path] = layout return result def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50 block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]} # Test code: [part.__dump__() for part in block_device.partitions.values()] # TODO: Squeeze in BTRFS subvolumes here new_partition = str(_('Create a new partition')) suggest_partition_layout = str(_('Suggest partition layout')) delete_partition = str(_('Delete a partition')) delete_all_partitions = str(_('Clear/Delete all partitions')) assign_mount_point = str(_('Assign mount-point for a partition')) mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)')) mark_encrypted = str(_('Mark/Unmark a partition as encrypted')) mark_compressed = str(_('Mark/Unmark a partition as compressed (btrfs only)')) mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)')) set_filesystem_partition = str(_('Set desired filesystem for a partition')) set_btrfs_subvolumes = str(_('Set desired subvolumes on a btrfs partition')) while True: modes = [new_partition, suggest_partition_layout] if len(block_device_struct['partitions']): modes += [ delete_partition, delete_all_partitions, assign_mount_point, mark_formatted, mark_encrypted, mark_bootable, mark_compressed, set_filesystem_partition, set_btrfs_subvolumes, ] title = _('Select what to do with\n{}').format(block_device) # show current partition layout: if len(block_device_struct["partitions"]): title += _current_partition_layout(block_device_struct['partitions']) + '\n' task = Menu(title, modes, sort=False).run() if not task: break if task == new_partition: from ..disk import valid_parted_position # if partition_type == 'gpt': # # https://www.gnu.org/software/parted/manual/html_node/mkpart.html # # https://www.gnu.org/software/parted/manual/html_node/mklabel.html # name = input("Enter a desired name for the partition: ").strip() fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run() prompt = _('Enter the start sector (percentage or block number, default: {}): ').format( block_device.first_free_sector) start = input(prompt).strip() if not start.strip(): start = block_device.first_free_sector end_suggested = block_device.first_end_sector else: end_suggested = '100%' prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format( end_suggested) end = input(prompt).strip() if not end.strip(): end = end_suggested if valid_parted_position(start) and valid_parted_position(end): if partition_overlap(block_device_struct["partitions"], start, end): log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", fg="red") continue block_device_struct["partitions"].append({ "type": "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject "start": start, "size": end, "mountpoint": None, "wipe": True, "filesystem": { "format": fstype } }) else: log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.", fg="red") continue elif task == suggest_partition_layout: from ..disk import suggest_single_disk_layout if len(block_device_struct["partitions"]): prompt = _('{} contains queued partitions, this will remove those, are you sure?').format(block_device) choice = Menu(prompt, ['yes', 'no'], default_option='no').run() if choice == 'no': continue block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path]) elif task is None: return block_device_struct else: current_layout = _current_partition_layout(block_device_struct['partitions'], with_idx=True) if task == delete_partition: title = _('{}\n\nSelect by index which partitions to delete').format(current_layout) to_delete = select_partition(title, block_device_struct["partitions"], multiple=True) if to_delete: block_device_struct['partitions'] = [ p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete ] elif task == mark_compressed: title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: if "filesystem" not in block_device_struct["partitions"][partition]: block_device_struct["partitions"][partition]["filesystem"] = {} if "mount_options" not in block_device_struct["partitions"][partition]["filesystem"]: block_device_struct["partitions"][partition]["filesystem"]["mount_options"] = [] if "compress=zstd" not in block_device_struct["partitions"][partition]["filesystem"]["mount_options"]: block_device_struct["partitions"][partition]["filesystem"]["mount_options"].append("compress=zstd") elif task == delete_all_partitions: block_device_struct["partitions"] = [] elif task == assign_mount_point: title = _('{}\n\nSelect by index which partition to mount where').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: print( _(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) mountpoint = input( _('Select where to mount partition (leave blank to remove mountpoint): ')).strip() if len(mountpoint): block_device_struct["partitions"][partition]['mountpoint'] = mountpoint if mountpoint == '/boot': log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow") block_device_struct["partitions"][partition]['boot'] = True else: del (block_device_struct["partitions"][partition]['mountpoint']) elif task == mark_formatted: title = _('{}\n\nSelect which partition to mask for formatting').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, # it's safe to change the filesystem for this partition. if block_device_struct["partitions"][partition].get('filesystem',{}).get('format', 'crypto_LUKS') == 'crypto_LUKS': if not block_device_struct["partitions"][partition].get('filesystem', None): block_device_struct["partitions"][partition]['filesystem'] = {} fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run() block_device_struct["partitions"][partition]['filesystem']['format'] = fstype # Negate the current wipe marking block_device_struct["partitions"][partition][ 'wipe'] = not block_device_struct["partitions"][partition].get('wipe', False) elif task == mark_encrypted: title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: # Negate the current encryption marking block_device_struct["partitions"][partition][ 'encrypted'] = not block_device_struct["partitions"][partition].get('encrypted', False) elif task == mark_bootable: title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: block_device_struct["partitions"][partition][ 'boot'] = not block_device_struct["partitions"][partition].get('boot', False) elif task == set_filesystem_partition: title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: if not block_device_struct["partitions"][partition].get('filesystem', None): block_device_struct["partitions"][partition]['filesystem'] = {} fstype_title = _('Enter a desired filesystem type for the partition: ') fstype = Menu(fstype_title, fs_types(), skip=False).run() block_device_struct["partitions"][partition]['filesystem']['format'] = fstype elif task == set_btrfs_subvolumes: from .subvolume_config import SubvolumeList # TODO get preexisting partitions title = _('{}\n\nSelect which partition to set subvolumes on').format(current_layout) partition = select_partition(title, block_device_struct["partitions"],filter=lambda x:True if x.get('filesystem',{}).get('format') == 'btrfs' else False) if partition is not None: if not block_device_struct["partitions"][partition].get('btrfs', {}): block_device_struct["partitions"][partition]['btrfs'] = {} if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', {}): block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = {} prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes'] result = SubvolumeList(_("Manage btrfs subvolumes for current partition"),prev).run() if result: block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result else: del block_device_struct["partitions"][partition]['btrfs'] return block_device_struct def select_encrypted_partitions(block_devices: dict, password: str) -> dict: for device in block_devices: for partition in block_devices[device]['partitions']: if partition.get('mountpoint', None) != '/boot': partition['encrypted'] = True partition['!password'] = password if not has_mountpoint(partition,'/'): # Tell the upcoming steps to generate a key-file for non root mounts. partition['generate-encryption-key-file'] = True return block_devices # TODO: Next version perhaps we can support mixed multiple encrypted partitions # Users might want to single out a partition for non-encryption to share between dualboot etc.