Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/filesystem.py
diff options
context:
space:
mode:
authorDaniel Girtler <blackrabbit256@gmail.com>2023-04-19 20:55:42 +1000
committerGitHub <noreply@github.com>2023-04-19 12:55:42 +0200
commit00b0ae7ba439a5a420095175b3bedd52c569db51 (patch)
treef02d081e361d5e65603f74dea3873dcc6606cf7c /archinstall/lib/disk/filesystem.py
parent5253e57e9f26cf3e59cb2460544af13f56e485bb (diff)
PyParted and a large rewrite of the underlying partitioning (#1604)
* Invert mypy files * Add optional pre-commit hooks * New profile structure * Serialize profiles * Use profile instead of classmethod * Custom profile setup * Separator between back * Support profile import via url * Move profiles module * Refactor files * Remove symlink * Add user to docker group * Update schema description * Handle list services * mypy fixes * mypy fixes * Rename profilesv2 to profiles * flake8 * mypy again * Support selecting DM * Fix mypy * Cleanup * Update greeter setting * Update schema * Revert toml changes * Poc external dependencies * Dependency support * New encryption menu * flake8 * Mypy and flake8 * Unify lsblk command * Update bootloader configuration * Git hooks * Fix import * Pyparted * Remove custom font setting * flake8 * Remove default preview * Manual partitioning menu * Update structure * Disk configuration * Update filesystem * luks2 encryption * Everything works until installation * Btrfsutil * Btrfs handling * Update btrfs * Save encryption config * Fix pipewire issue * Update mypy version * Update all pre-commit * Update package versions * Revert audio/pipewire * Merge master PRs * Add master changes * Merge master changes * Small renaming * Pull master changes * Reset disk enc after disk config change * Generate locals * Update naming * Fix imports * Fix broken sync * Fix pre selection on table menu * Profile menu * Update profile * Fix post_install * Added python-pyparted to PKGBUILD, this requires [testing] to be enabled in order to run makepkg. Package still works via python -m build etc. * Swaped around some setuptools logic in pyproject Since we define `package-data` and `packages` there should be no need for: ``` [tool.setuptools.packages.find] where = ["archinstall", "archinstall.*"] ``` * Removed pyproject collisions. Duplicate definitions. * Made sure pyproject.toml includes languages * Add example and update README * Fix pyproject issues * Generate locale * Refactor imports * Simplify imports * Add profile description and package examples * Align code * Fix mypy * Simplify imports * Fix saving config * Fix wrong luks merge * Refactor installation * Fix cdrom device loading * Fix wrongly merged code * Fix imports and greeter * Don't terminate on partprobe error * Use specific path on partprobe from luks * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update github workflow to test archinstall installation * Update sway merge * Generate locales * Update workflow --------- Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com> Co-authored-by: Anton Hvornum <anton@hvornum.se> Co-authored-by: Anton Hvornum <anton.feeds+github@gmail.com> Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com>
Diffstat (limited to 'archinstall/lib/disk/filesystem.py')
-rw-r--r--archinstall/lib/disk/filesystem.py343
1 files changed, 70 insertions, 273 deletions
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 1083df53..6ea99340 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,301 +1,98 @@
from __future__ import annotations
-import time
-import logging
-import json
-import pathlib
-from typing import Optional, Dict, Any, TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-from ..models.disk_encryption import DiskEncryption
-if TYPE_CHECKING:
- from .blockdevice import BlockDevice
- _: Any
+import logging
+import signal
+import sys
+import time
+from typing import Any, Optional, TYPE_CHECKING
-from .partition import Partition
-from .validators import valid_fs_type
-from ..exceptions import DiskError, SysCallError
-from ..general import SysCommand
+from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption
+from .device_handler import device_handler
+from ..hardware import has_uefi
from ..output import log
-from ..storage import storage
-
-GPT = 0b00000001
-MBR = 0b00000010
-
-# A sane default is 5MiB, that allows for plenty of buffer for GRUB on MBR
-# but also 4MiB for memory cards for instance. And another 1MiB to avoid issues.
-# (we've been pestered by disk issues since the start, so please let this be here for a few versions)
-DEFAULT_PARTITION_START = '5MiB'
-
-class Filesystem:
- # TODO:
- # When instance of a HDD is selected, check all usages and gracefully unmount them
- # as well as close any crypto handles.
- def __init__(self, blockdevice :BlockDevice, mode :int):
- self.blockdevice = blockdevice
- self.mode = mode
-
- def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
- return self
-
- def __repr__(self) -> str:
- return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
+from ..menu import Menu
- def __exit__(self, *args :str, **kwargs :str) -> bool:
- # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
- if len(args) >= 2 and args[1]:
- raise args[1]
-
- SysCommand('sync')
- return True
-
- def partuuid_to_index(self, uuid :str) -> Optional[int]:
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- self.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
-
- # We'll use unreliable lbslk to grab children under the /dev/<device>
- output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8'))
+if TYPE_CHECKING:
+ _: Any
- for device in output['blockdevices']:
- for index, partition in enumerate(device.get('children', [])):
- # But we'll use blkid to reliably grab the PARTUUID for that child device (partition)
- partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip()
- if partition_uuid.lower() == uuid.lower():
- return index
- raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")
+class FilesystemHandler:
+ def __init__(
+ self,
+ disk_config: DiskLayoutConfiguration,
+ enc_conf: Optional[DiskEncryption] = None
+ ):
+ self._disk_config = disk_config
+ self._enc_config = enc_conf
- def load_layout(self, layout :Dict[str, Any]) -> None:
- from ..luks import luks2
- from .btrfs import BTRFSPartition
+ def perform_filesystem_operations(self, show_countdown: bool = True):
+ if self._disk_config.config_type == DiskLayoutType.Pre_mount:
+ log('Disk layout configuration is set to pre-mount, not performing any operations', level=logging.DEBUG)
+ return
- # If the layout tells us to wipe the drive, we do so
- if layout.get('wipe', False):
- if self.mode == GPT:
- if not self.parted_mklabel(self.blockdevice.device, "gpt"):
- raise KeyError(f"Could not create a GPT label on {self}")
- elif self.mode == MBR:
- if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MS-DOS label on {self}")
+ device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications))
- self.blockdevice.flush_cache()
- time.sleep(3)
+ if not device_mods:
+ log('No modifications required', level=logging.DEBUG)
+ return
- prev_partition = None
- # We then iterate the partitions in order
- for partition in layout.get('partitions', []):
- # We don't want to re-add an existing partition (those containing a UUID already)
- if partition.get('wipe', False) and not partition.get('PARTUUID', None):
- start = partition.get('start') or (
- prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START)
- partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
- start=start,
- end=partition.get('size', '100%'),
- partition_format=partition.get('filesystem', {}).get('format', 'btrfs'),
- skip_mklabel=layout.get('wipe', False) is not False)
+ device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods])
- elif (partition_uuid := partition.get('PARTUUID')):
- # We try to deal with both UUID and PARTUUID of a partition when it's being re-used.
- # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID')
- # but for now, lets just attempt to deal with both.
- try:
- partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
- except DiskError:
- partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)
+ # Issue a final warning before we continue with something un-revertable.
+ # We mention the drive one last time, and count from 5 to 0.
+ print(str(_(' ! Formatting {} in ')).format(device_paths))
- log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
- else:
- log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
- continue
+ if show_countdown:
+ self._do_countdown()
- if partition.get('filesystem', {}).get('format', False):
- # needed for backward compatibility with the introduction of the new "format_options"
- format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[])
- disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')
+ # Setup the blockdevice, filesystem (and optionally encryption).
+ # Once that's done, we'll hand over to perform_installation()
+ partition_table = PartitionTable.GPT
+ if has_uefi() is False:
+ partition_table = PartitionTable.MBR
- if disk_encryption and partition in disk_encryption.all_partitions:
- if not partition['device_instance']:
- raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
+ for mod in device_mods:
+ device_handler.partition(mod, partition_table=partition_table)
+ device_handler.format(mod, enc_conf=self._enc_config)
- if partition.get('mountpoint',None):
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
- else:
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
+ for part_mod in mod.partitions:
+ if part_mod.fs_type == FilesystemType.Btrfs:
+ device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config)
- partition['device_instance'].encrypt(password=disk_encryption.encryption_password)
- # Immediately unlock the encrypted device to format the inner volume
- with luks2(partition['device_instance'], loopdev, disk_encryption.encryption_password, auto_unmount=True) as unlocked_device:
- if not partition.get('wipe'):
- if storage['arguments'] == 'silent':
- raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}")
- else:
- if not partition.get('filesystem'):
- partition['filesystem'] = {}
+ def _do_countdown(self) -> bool:
+ SIG_TRIGGER = False
- if not partition['filesystem'].get('format', False):
- while True:
- partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
- if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
- log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
- continue
- break
+ def kill_handler(sig: int, frame: Any) -> None:
+ print()
+ exit(0)
- unlocked_device.format(partition['filesystem']['format'], options=format_options)
+ def sig_handler(sig: int, frame: Any) -> None:
+ signal.signal(signal.SIGINT, kill_handler)
- elif partition.get('wipe', False):
- if not partition['device_instance']:
- raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
+ original_sigint_handler = signal.getsignal(signal.SIGINT)
+ signal.signal(signal.SIGINT, sig_handler)
- partition['device_instance'].format(partition['filesystem']['format'], options=format_options)
+ for i in range(5, 0, -1):
+ print(f"{i}", end='')
- if partition['filesystem']['format'] == 'btrfs':
- # We upgrade the device instance to a BTRFSPartition if we format it as such.
- # This is so that we can gain access to more features than otherwise available in Partition()
- partition['device_instance'] = BTRFSPartition(
- partition['device_instance'].path,
- block_device=partition['device_instance'].block_device,
- encrypted=False,
- filesystem='btrfs',
- autodetect_filesystem=False
- )
+ for x in range(4):
+ sys.stdout.flush()
+ time.sleep(0.25)
+ print(".", end='')
- if partition.get('boot', False):
- log(f"Marking partition {partition['device_instance']} as bootable.")
- self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on')
+ if SIG_TRIGGER:
+ prompt = _('Do you really want to abort?')
+ choice = Menu(prompt, Menu.yes_no(), skip=False).run()
+ if choice.value == Menu.yes():
+ exit(0)
- prev_partition = partition
+ if SIG_TRIGGER is False:
+ sys.stdin.read()
- def find_partition(self, mountpoint :str) -> Partition:
- for partition in self.blockdevice:
- if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
- return partition
+ SIG_TRIGGER = False
+ signal.signal(signal.SIGINT, sig_handler)
- def partprobe(self) -> bool:
- try:
- SysCommand(f'partprobe {self.blockdevice.device}')
- except SysCallError as error:
- log(f"Could not execute partprobe: {error!r}", level=logging.ERROR, fg="red")
- raise DiskError(f"Could not run partprobe on {self.blockdevice.device}: {error!r}")
+ print()
+ signal.signal(signal.SIGINT, original_sigint_handler)
return True
-
- def raw_parted(self, string: str) -> SysCommand:
- try:
- cmd_handle = SysCommand(f'/usr/bin/parted -s {string}')
- time.sleep(0.5)
- return cmd_handle
- except SysCallError as error:
- log(f"Parted ended with a bad exit code: {error.exit_code} ({error})", level=logging.ERROR, fg="red")
- return error
-
- def parted(self, string: str) -> bool:
- """
- Performs a parted execution of the given string
-
- :param string: A raw string passed to /usr/bin/parted -s <string>
- :type string: str
- """
- if (parted_handle := self.raw_parted(string)).exit_code == 0:
- return self.partprobe()
- else:
- raise DiskError(f"Parted failed to add a partition: {parted_handle}")
-
- def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
- # TODO: Implement this with declarative profiles instead.
- raise ValueError("Installation().use_entire_disk() has to be re-worked.")
-
- def add_partition(
- self,
- partition_type :str,
- start :str,
- end :str,
- partition_format :Optional[str] = None,
- skip_mklabel :bool = False
- ) -> Partition:
- log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
-
- if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
- # If it's a completely empty drive, and we're about to add partitions to it
- # we need to make sure there's a filesystem label.
- if self.mode == GPT:
- if not self.parted_mklabel(self.blockdevice.device, "gpt"):
- raise KeyError(f"Could not create a GPT label on {self}")
- elif self.mode == MBR:
- if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MS-DOS label on {self}")
-
- self.blockdevice.flush_cache()
-
- previous_partuuids = []
- for partition in self.blockdevice.partitions.values():
- try:
- previous_partuuids.append(partition.part_uuid)
- except DiskError:
- pass
-
- # TODO this check should probably run in the setup process rather than during the installation
- if self.mode == MBR:
- if len(self.blockdevice.partitions) > 3:
- DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")
-
- if partition_format:
- parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}'
- else:
- parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}'
-
- log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG)
-
- if self.parted(parted_string):
- for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
- self.blockdevice.flush_cache()
-
- new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
- new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
-
- if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
- try:
- return self.blockdevice.get_partition(partuuid=new_partuuid)
- except Exception as err:
- log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
- log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
- log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
- log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
- log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
- raise err
- else:
- log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
- self.partprobe()
- time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
- else:
- print("Parted did not return True during partition creation")
-
- total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
- total_partitions.update(previous_partuuids)
-
- # TODO: This should never be able to happen
- log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
- log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")
-
- raise DiskError(f"Could not add partition using: {parted_string}")
-
- def set_name(self, partition: int, name: str) -> bool:
- return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
-
- def set(self, partition: int, string: str) -> bool:
- log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
- return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
-
- def parted_mklabel(self, device: str, disk_label: str) -> bool:
- log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
- # Try to unmount devices before attempting to run mklabel
- try:
- SysCommand(f'bash -c "umount {device}?"')
- except:
- pass
-
- self.partprobe()
- worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0
- self.partprobe()
-
- return worked