Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/filesystem.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/filesystem.py')
-rw-r--r--archinstall/lib/disk/filesystem.py622
1 files changed, 351 insertions, 271 deletions
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 1083df53..5c11896e 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,301 +1,381 @@
from __future__ import annotations
+
+import signal
+import sys
import time
-import logging
-import json
-import pathlib
-from typing import Optional, Dict, Any, TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-from ..models.disk_encryption import DiskEncryption
+from pathlib import Path
+from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set
+
+from .device_handler import device_handler
+from .device_model import (
+ DiskLayoutConfiguration, DiskLayoutType, PartitionTable,
+ FilesystemType, DiskEncryption, LvmVolumeGroup,
+ Size, Unit, SectorSize, PartitionModification, EncryptionType,
+ LvmVolume, LvmConfiguration
+)
+from ..hardware import SysInfo
+from ..luks import Luks2
+from ..menu import Menu
+from ..output import debug, info
+from ..general import SysCommand
if TYPE_CHECKING:
- from .blockdevice import BlockDevice
_: Any
-from .partition import Partition
-from .validators import valid_fs_type
-from ..exceptions import DiskError, SysCallError
-from ..general import SysCommand
-from ..output import log
-from ..storage import storage
-GPT = 0b00000001
-MBR = 0b00000010
+class FilesystemHandler:
+ def __init__(
+ self,
+ disk_config: DiskLayoutConfiguration,
+ enc_conf: Optional[DiskEncryption] = None
+ ):
+ self._disk_config = disk_config
+ self._enc_config = enc_conf
+
+ def perform_filesystem_operations(self, show_countdown: bool = True):
+ if self._disk_config.config_type == DiskLayoutType.Pre_mount:
+ debug('Disk layout configuration is set to pre-mount, not performing any operations')
+ return
+
+ device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications))
+
+ if not device_mods:
+ debug('No modifications required')
+ return
+
+ device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods])
+
+ # Issue a final warning before we continue with something un-revertable.
+ # We mention the drive one last time, and count from 5 to 0.
+ print(str(_(' ! Formatting {} in ')).format(device_paths))
+
+ if show_countdown:
+ self._do_countdown()
+
+ # Setup the blockdevice, filesystem (and optionally encryption).
+ # Once that's done, we'll hand over to perform_installation()
+ partition_table = PartitionTable.GPT
+ if SysInfo.has_uefi() is False:
+ partition_table = PartitionTable.MBR
+
+ for mod in device_mods:
+ device_handler.partition(mod, partition_table=partition_table)
+
+ if self._disk_config.lvm_config:
+ for mod in device_mods:
+ if boot_part := mod.get_boot_partition():
+ debug(f'Formatting boot partition: {boot_part.dev_path}')
+ self._format_partitions(
+ [boot_part],
+ mod.device_path
+ )
+
+ self.perform_lvm_operations()
+ else:
+ for mod in device_mods:
+ self._format_partitions(
+ mod.partitions,
+ mod.device_path
+ )
-# A sane default is 5MiB, that allows for plenty of buffer for GRUB on MBR
-# but also 4MiB for memory cards for instance. And another 1MiB to avoid issues.
-# (we've been pestered by disk issues since the start, so please let this be here for a few versions)
-DEFAULT_PARTITION_START = '5MiB'
+ for part_mod in mod.partitions:
+ if part_mod.fs_type == FilesystemType.Btrfs:
+ device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config)
-class Filesystem:
- # TODO:
- # When instance of a HDD is selected, check all usages and gracefully unmount them
- # as well as close any crypto handles.
- def __init__(self, blockdevice :BlockDevice, mode :int):
- self.blockdevice = blockdevice
- self.mode = mode
+ def _format_partitions(
+ self,
+ partitions: List[PartitionModification],
+ device_path: Path
+ ):
+ """
+ Format can be given an overriding path, for instance /dev/null to test
+ the formatting functionality and in essence the support for the given filesystem.
+ """
- def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
- return self
+ # don't touch existing partitions
+ create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()]
- def __repr__(self) -> str:
- return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
+ self._validate_partitions(create_or_modify_parts)
- def __exit__(self, *args :str, **kwargs :str) -> bool:
- # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
- if len(args) >= 2 and args[1]:
- raise args[1]
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(device_path)
- SysCommand('sync')
- return True
+ for part_mod in create_or_modify_parts:
+ # partition will be encrypted
+ if self._enc_config is not None and part_mod in self._enc_config.partitions:
+ device_handler.format_encrypted(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ part_mod.safe_fs_type,
+ self._enc_config
+ )
+ else:
+ device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path)
+
+ # synchronize with udev before using lsblk
+ SysCommand('udevadm settle')
+
+ lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path)
+
+ part_mod.partn = lsblk_info.partn
+ part_mod.partuuid = lsblk_info.partuuid
+ part_mod.uuid = lsblk_info.uuid
+
+ def _validate_partitions(self, partitions: List[PartitionModification]):
+ checks = {
+ # verify that all partitions have a path set (which implies that they have been created)
+ lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
+ # crypto luks is not a valid file system type
+ lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError(
+ 'Crypto luks cannot be set as a filesystem type'),
+ # file system type must be set
+ lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
+ }
+
+ for check, exc in checks.items():
+ found = next(filter(check, partitions), None)
+ if found is not None:
+ raise exc
+
+ def perform_lvm_operations(self):
+ info('Setting up LVM config...')
+
+ if not self._disk_config.lvm_config:
+ return
+
+ if self._enc_config:
+ self._setup_lvm_encrypted(
+ self._disk_config.lvm_config,
+ self._enc_config
+ )
+ else:
+ self._setup_lvm(self._disk_config.lvm_config)
+ self._format_lvm_vols(self._disk_config.lvm_config)
+
+ def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption):
+ if enc_config.encryption_type == EncryptionType.LvmOnLuks:
+ enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False)
+
+ self._setup_lvm(lvm_config, enc_mods)
+ self._format_lvm_vols(lvm_config)
+
+ # export the lvm group safely otherwise the Luks cannot be closed
+ self._safely_close_lvm(lvm_config)
+
+ for luks in enc_mods.values():
+ luks.lock()
+ elif enc_config.encryption_type == EncryptionType.LuksOnLvm:
+ self._setup_lvm(lvm_config)
+ enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False)
+ self._format_lvm_vols(lvm_config, enc_vols)
+
+ for luks in enc_vols.values():
+ luks.lock()
+
+ self._safely_close_lvm(lvm_config)
+
+ def _safely_close_lvm(self, lvm_config: LvmConfiguration):
+ for vg in lvm_config.vol_groups:
+ for vol in vg.volumes:
+ device_handler.lvm_vol_change(vol, False)
+
+ device_handler.lvm_export_vg(vg)
+
+ def _setup_lvm(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ self._lvm_create_pvs(lvm_config, enc_mods)
+
+ for vg in lvm_config.vol_groups:
+ pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods)
+
+ device_handler.lvm_vg_create(pv_dev_paths, vg.name)
+
+ # figure out what the actual available size in the group is
+ vg_info = device_handler.lvm_group_info(vg.name)
+
+ if not vg_info:
+ raise ValueError('Unable to fetch VG info')
+
+ # the actual available LVM Group size will be smaller than the
+ # total PVs size due to reserved metadata storage etc.
+ # so we'll have a look at the total avail. size, check the delta
+ # to the desired sizes and subtract some equally from the actually
+ # created volume
+ avail_size = vg_info.vg_size
+ desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default()))
+
+ delta = desired_size - avail_size
+ max_vol_offset = delta.convert(Unit.B)
+
+ max_vol = max(vg.volumes, key=lambda x: x.length)
+
+ for lv in vg.volumes:
+ offset = max_vol_offset if lv == max_vol else None
+
+ debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}')
+ device_handler.lvm_vol_create(vg.name, lv, offset)
- def partuuid_to_index(self, uuid :str) -> Optional[int]:
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- self.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
-
- # We'll use unreliable lbslk to grab children under the /dev/<device>
- output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- for index, partition in enumerate(device.get('children', [])):
- # But we'll use blkid to reliably grab the PARTUUID for that child device (partition)
- partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip()
- if partition_uuid.lower() == uuid.lower():
- return index
-
- raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")
-
- def load_layout(self, layout :Dict[str, Any]) -> None:
- from ..luks import luks2
- from .btrfs import BTRFSPartition
-
- # If the layout tells us to wipe the drive, we do so
- if layout.get('wipe', False):
- if self.mode == GPT:
- if not self.parted_mklabel(self.blockdevice.device, "gpt"):
- raise KeyError(f"Could not create a GPT label on {self}")
- elif self.mode == MBR:
- if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MS-DOS label on {self}")
-
- self.blockdevice.flush_cache()
- time.sleep(3)
-
- prev_partition = None
- # We then iterate the partitions in order
- for partition in layout.get('partitions', []):
- # We don't want to re-add an existing partition (those containing a UUID already)
- if partition.get('wipe', False) and not partition.get('PARTUUID', None):
- start = partition.get('start') or (
- prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START)
- partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
- start=start,
- end=partition.get('size', '100%'),
- partition_format=partition.get('filesystem', {}).get('format', 'btrfs'),
- skip_mklabel=layout.get('wipe', False) is not False)
-
- elif (partition_uuid := partition.get('PARTUUID')):
- # We try to deal with both UUID and PARTUUID of a partition when it's being re-used.
- # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID')
- # but for now, lets just attempt to deal with both.
- try:
- partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
- except DiskError:
- partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)
-
- log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
+ while True:
+ debug('Fetching LVM volume info')
+ lv_info = device_handler.lvm_vol_info(lv.name)
+ if lv_info is not None:
+ break
+
+ time.sleep(1)
+
+ self._lvm_vol_handle_e2scrub(vg)
+
+ def _format_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+ ):
+ for vol in lvm_config.get_all_volumes():
+ if enc_vol := enc_vols.get(vol, None):
+ if not enc_vol.mapper_dev:
+ raise ValueError('No mapper device defined')
+ path = enc_vol.mapper_dev
else:
- log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
- continue
-
- if partition.get('filesystem', {}).get('format', False):
- # needed for backward compatibility with the introduction of the new "format_options"
- format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[])
- disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')
-
- if disk_encryption and partition in disk_encryption.all_partitions:
- if not partition['device_instance']:
- raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
-
- if partition.get('mountpoint',None):
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
- else:
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
-
- partition['device_instance'].encrypt(password=disk_encryption.encryption_password)
- # Immediately unlock the encrypted device to format the inner volume
- with luks2(partition['device_instance'], loopdev, disk_encryption.encryption_password, auto_unmount=True) as unlocked_device:
- if not partition.get('wipe'):
- if storage['arguments'] == 'silent':
- raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}")
- else:
- if not partition.get('filesystem'):
- partition['filesystem'] = {}
-
- if not partition['filesystem'].get('format', False):
- while True:
- partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
- if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
- log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
- continue
- break
-
- unlocked_device.format(partition['filesystem']['format'], options=format_options)
-
- elif partition.get('wipe', False):
- if not partition['device_instance']:
- raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
-
- partition['device_instance'].format(partition['filesystem']['format'], options=format_options)
-
- if partition['filesystem']['format'] == 'btrfs':
- # We upgrade the device instance to a BTRFSPartition if we format it as such.
- # This is so that we can gain access to more features than otherwise available in Partition()
- partition['device_instance'] = BTRFSPartition(
- partition['device_instance'].path,
- block_device=partition['device_instance'].block_device,
- encrypted=False,
- filesystem='btrfs',
- autodetect_filesystem=False
- )
-
- if partition.get('boot', False):
- log(f"Marking partition {partition['device_instance']} as bootable.")
- self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on')
-
- prev_partition = partition
-
- def find_partition(self, mountpoint :str) -> Partition:
- for partition in self.blockdevice:
- if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
- return partition
-
- def partprobe(self) -> bool:
- try:
- SysCommand(f'partprobe {self.blockdevice.device}')
- except SysCallError as error:
- log(f"Could not execute partprobe: {error!r}", level=logging.ERROR, fg="red")
- raise DiskError(f"Could not run partprobe on {self.blockdevice.device}: {error!r}")
+ path = vol.safe_dev_path
- return True
+ # wait a bit otherwise the mkfs will fail as it can't
+ # find the mapper device yet
+ device_handler.format(vol.fs_type, path)
- def raw_parted(self, string: str) -> SysCommand:
- try:
- cmd_handle = SysCommand(f'/usr/bin/parted -s {string}')
- time.sleep(0.5)
- return cmd_handle
- except SysCallError as error:
- log(f"Parted ended with a bad exit code: {error.exit_code} ({error})", level=logging.ERROR, fg="red")
- return error
+ if vol.fs_type == FilesystemType.Btrfs:
+ device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options)
- def parted(self, string: str) -> bool:
- """
- Performs a parted execution of the given string
+ def _lvm_create_pvs(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ pv_paths: Set[Path] = set()
- :param string: A raw string passed to /usr/bin/parted -s <string>
- :type string: str
- """
- if (parted_handle := self.raw_parted(string)).exit_code == 0:
- return self.partprobe()
- else:
- raise DiskError(f"Parted failed to add a partition: {parted_handle}")
+ for vg in lvm_config.vol_groups:
+ pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods)
- def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
- # TODO: Implement this with declarative profiles instead.
- raise ValueError("Installation().use_entire_disk() has to be re-worked.")
+ device_handler.lvm_pv_create(pv_paths)
- def add_partition(
+ def _get_all_pv_dev_paths(
self,
- partition_type :str,
- start :str,
- end :str,
- partition_format :Optional[str] = None,
- skip_mklabel :bool = False
- ) -> Partition:
- log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
-
- if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
- # If it's a completely empty drive, and we're about to add partitions to it
- # we need to make sure there's a filesystem label.
- if self.mode == GPT:
- if not self.parted_mklabel(self.blockdevice.device, "gpt"):
- raise KeyError(f"Could not create a GPT label on {self}")
- elif self.mode == MBR:
- if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MS-DOS label on {self}")
-
- self.blockdevice.flush_cache()
-
- previous_partuuids = []
- for partition in self.blockdevice.partitions.values():
- try:
- previous_partuuids.append(partition.part_uuid)
- except DiskError:
- pass
-
- # TODO this check should probably run in the setup process rather than during the installation
- if self.mode == MBR:
- if len(self.blockdevice.partitions) > 3:
- DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")
-
- if partition_format:
- parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}'
- else:
- parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}'
-
- log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG)
-
- if self.parted(parted_string):
- for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
- self.blockdevice.flush_cache()
-
- new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
- new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
-
- if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
- try:
- return self.blockdevice.get_partition(partuuid=new_partuuid)
- except Exception as err:
- log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
- log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
- log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
- log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
- log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
- raise err
- else:
- log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
- self.partprobe()
- time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
- else:
- print("Parted did not return True during partition creation")
+ pvs: List[PartitionModification],
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ) -> Set[Path]:
+ pv_paths: Set[Path] = set()
+
+ for pv in pvs:
+ if enc_pv := enc_mods.get(pv, None):
+ if mapper := enc_pv.mapper_dev:
+ pv_paths.add(mapper)
+ else:
+ pv_paths.add(pv.safe_dev_path)
+
+ return pv_paths
+
+ def _encrypt_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[LvmVolume, Luks2]:
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+
+ for vol in lvm_config.get_all_volumes():
+ if vol in enc_config.lvm_volumes:
+ luks_handler = device_handler.encrypt(
+ vol.safe_dev_path,
+ vol.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create
+ )
+
+ enc_vols[vol] = luks_handler
+
+ return enc_vols
+
+ def _encrypt_partitions(
+ self,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[PartitionModification, Luks2]:
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+
+ for mod in self._disk_config.device_modifications:
+ partitions = mod.partitions
+
+ # don't touch existing partitions
+ filtered_part = [p for p in partitions if not p.exists()]
+
+ self._validate_partitions(filtered_part)
+
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(mod.device_path)
+
+ enc_mods = {}
+
+ for part_mod in filtered_part:
+ if part_mod in enc_config.partitions:
+ luks_handler = device_handler.encrypt(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create=lock_after_create
+ )
+
+ enc_mods[part_mod] = luks_handler
- total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
- total_partitions.update(previous_partuuids)
+ return enc_mods
- # TODO: This should never be able to happen
- log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
- log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")
+ def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup):
+ # from arch wiki:
+ # If a logical volume will be formatted with ext4, leave at least 256 MiB
+ # free space in the volume group to allow using e2scrub
+ if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]):
+ largest_vol = max(vol_gp.volumes, key=lambda x: x.length)
- raise DiskError(f"Could not add partition using: {parted_string}")
+ device_handler.lvm_vol_reduce(
+ largest_vol.safe_dev_path,
+ Size(256, Unit.MiB, SectorSize.default())
+ )
- def set_name(self, partition: int, name: str) -> bool:
- return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
+ def _do_countdown(self) -> bool:
+ SIG_TRIGGER = False
- def set(self, partition: int, string: str) -> bool:
- log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
- return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
+ def kill_handler(sig: int, frame: Any) -> None:
+ print()
+ exit(0)
- def parted_mklabel(self, device: str, disk_label: str) -> bool:
- log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
- # Try to unmount devices before attempting to run mklabel
- try:
- SysCommand(f'bash -c "umount {device}?"')
- except:
- pass
+ def sig_handler(sig: int, frame: Any) -> None:
+ signal.signal(signal.SIGINT, kill_handler)
- self.partprobe()
- worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0
- self.partprobe()
+ original_sigint_handler = signal.getsignal(signal.SIGINT)
+ signal.signal(signal.SIGINT, sig_handler)
- return worked
+ for i in range(5, 0, -1):
+ print(f"{i}", end='')
+
+ for x in range(4):
+ sys.stdout.flush()
+ time.sleep(0.25)
+ print(".", end='')
+
+ if SIG_TRIGGER:
+ prompt = _('Do you really want to abort?')
+ choice = Menu(prompt, Menu.yes_no(), skip=False).run()
+ if choice.value == Menu.yes():
+ exit(0)
+
+ if SIG_TRIGGER is False:
+ sys.stdin.read()
+
+ SIG_TRIGGER = False
+ signal.signal(signal.SIGINT, sig_handler)
+
+ print()
+ signal.signal(signal.SIGINT, original_sigint_handler)
+
+ return True