index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/blockdevice.py | 348 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs/__init__.py | 132 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs/btrfs_helpers.py | 126 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs/btrfspartition.py | 37 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py (renamed from archinstall/lib/disk/btrfs/btrfssubvolume.py) | 9 | ||||
-rw-r--r-- | archinstall/lib/disk/filesystem.py | 38 | ||||
-rw-r--r-- | archinstall/lib/disk/helpers.py | 50 | ||||
-rw-r--r-- | archinstall/lib/disk/mapperdev.py | 16 | ||||
-rw-r--r-- | archinstall/lib/disk/partition.py | 426 | ||||
-rw-r--r-- | archinstall/lib/disk/user_guides.py | 19 | ||||
-rw-r--r-- | archinstall/lib/disk/validators.py | 8 |
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index c7b69205..736bacbc 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,13 +1,11 @@ from __future__ import annotations -import os import json import logging import time -from functools import cached_property -from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from .partition import Partition + +from collections import OrderedDict +from dataclasses import dataclass +from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING from ..exceptions import DiskError, SysCallError from ..output import log @@ -15,18 +13,44 @@ from ..general import SysCommand from ..storage import storage +if TYPE_CHECKING: + from .partition import Partition + _: Any + + +@dataclass +class BlockSizeInfo: + start: str + end: str + size: str + + +@dataclass +class BlockInfo: + pttype: str + ptuuid: str + size: int + tran: Optional[str] + rota: bool + free_space: Optional[List[BlockSizeInfo]] + + class BlockDevice: def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: from .helpers import all_blockdevices # If we don't give any information, we need to auto-fill it. # Otherwise any subsequent usage will break. - info = all_blockdevices(partitions=False)[path].info + self.info = all_blockdevices(partitions=False)[path].info + else: + self.info = info - self.path = path - self.info = info + self._path = path self.keep_partitions = True - self.part_cache = {} + self._block_info = self._fetch_information() + self._partitions: Dict[str, 'Partition'] = {} + + self._load_partitions() # TODO: Currently disk encryption is a BIT misleading. # It's actually partition-encryption, but for future-proofing this @@ -35,70 +59,113 @@ class BlockDevice: def __repr__(self, *args :str, **kwargs :str) -> str: return self._str_repr - @cached_property + @property + def path(self) -> str: + return self._path + + @property def _str_repr(self) -> str: - return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})" - - @cached_property - def display_info(self) -> str: - columns = { - str(_('Device')): self.device_or_backfile, - str(_('Size')): f'{self._safe_size}GB', - str(_('Free space')): f'{self._safe_free_space}', + return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})" + + def as_json(self) -> Dict[str, Any]: + return { + str(_('Device')): self._device_or_backfile, + str(_('Size')): f'{self.size}GB', + str(_('Free space')): f'{self._safe_free_space()}', str(_('Bus-type')): f'{self.bus_type}' } - padding = max([len(k) for k in columns.keys()]) - - pretty = '' - for k, v in columns.items(): - k = k.ljust(padding, ' ') - pretty += f'{k} = {v}\n' - - return pretty.rstrip() - - def __iter__(self) -> Iterator[Partition]: + def __iter__(self) -> Iterator['Partition']: for partition in self.partitions: yield self.partitions[partition] def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: if hasattr(self, key): return getattr(self, key) - elif key not in self.info: - raise KeyError(f'{self} does not contain information: "{key}"') - return self.info[key] - def __len__(self) -> int: - return len(self.partitions) + if self.info and key in self.info: + return self.info[key] + + raise KeyError(f'{self.info} does not contain information: "{key}"') def __lt__(self, left_comparitor :'BlockDevice') -> bool: - return self.path < left_comparitor.path + return self._path < left_comparitor.path def json(self) -> str: """ json() has precedence over __dump__, so this is a way to give less/partial information for user readability. """ - return self.path + return self._path def __dump__(self) -> Dict[str, Dict[str, Any]]: return { - self.path : { - 'partuuid' : self.uuid, - 'wipe' : self.info.get('wipe', None), - 'partitions' : [part.__dump__() for part in self.partitions.values()] + self._path: { + 'partuuid': self.uuid, + 'wipe': self.info.get('wipe', None), + 'partitions': [part.__dump__() for part in self.partitions.values()] } } - @property - def partition_type(self) -> str: - output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) + def _call_lsblk(self, path: str) -> Dict[str, Any]: + output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8') + if output: + lsblk_info = json.loads(output) + return lsblk_info + + raise DiskError(f'Failed to read disk "{self.path}" with lsblk') + + def _load_partitions(self): + from .partition import Partition + + self._partitions.clear() + + lsblk_info = self._call_lsblk(self._path) + device = lsblk_info['blockdevices'][0] + self._partitions.clear() + + if children := device.get('children', None): + root = f'/dev/{device["name"]}' + for child in children: + part_id = child['name'].removeprefix(device['name']) + self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id) + + def _get_free_space(self) -> Optional[List[BlockSizeInfo]]: + # NOTE: parted -s will default to `cancel` on prompt, skipping any partition + # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, + # so the free will ignore the ESP partition and just give the "free" space. + # Doesn't harm us, but worth noting in case something weird happens. + try: + output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8') + if output: + free_lines = [line for line in output.split('\n') if 'free' in line] + sizes = [] + for free_space in free_lines: + _, start, end, size, *_ = free_space.strip('\r\n;').split(':') + sizes.append(BlockSizeInfo(start, end, size)) + + return sizes + except SysCallError as error: + log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG) + + return None + + def _fetch_information(self) -> BlockInfo: + lsblk_info = self._call_lsblk(self._path) + device = lsblk_info['blockdevices'][0] + free_space = self._get_free_space() - for device in output['blockdevices']: - return device['pttype'] + return BlockInfo( + pttype=device['pttype'], + ptuuid=device['ptuuid'], + size=device['size'], + tran=device['tran'], + rota=device['rota'], + free_space=free_space + ) - @cached_property - def device_or_backfile(self) -> str: + @property + def _device_or_backfile(self) -> Optional[str]: """ Returns the actual device-endpoint of the BlockDevice. If it's a loop-back-device it returns the back-file, @@ -118,7 +185,7 @@ class BlockDevice: return None @property - def device(self) -> str: + def device(self) -> Optional[str]: """ Returns the device file of the BlockDevice. If it's a loop-back-device it returns the /dev/X device, @@ -126,168 +193,82 @@ class BlockDevice: And if it's a crypto-device it returns the parent device """ if "DEVTYPE" not in self.info: - raise DiskError(f'Could not locate backplane info for "{self.path}"') + raise DiskError(f'Could not locate backplane info for "{self._path}"') if self.info['DEVTYPE'] in ['disk','loop']: - return self.path + return self._path elif self.info['DEVTYPE'][:4] == 'raid': # This should catch /dev/md## raid devices - return self.path + return self._path elif self.info['DEVTYPE'] == 'crypt': if 'pkname' not in self.info: - raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.') + raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.') return f"/dev/{self.info['pkname']}" else: - log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG) - - # if not stat.S_ISBLK(os.stat(full_path).st_mode): - # raise DiskError(f'Selected disk "{full_path}" is not a block device.') - - @property - def partitions(self) -> Dict[str, Partition]: - from .filesystem import Partition - - self.partprobe() - result = SysCommand(['/usr/bin/lsblk', '-J', self.path]) - - if b'not a block device' in result: - raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') + log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG) - if not result[:1] == b'{': - raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}') - - r = json.loads(result.decode('UTF-8')) - if len(r['blockdevices']) and 'children' in r['blockdevices'][0]: - root_path = f"/dev/{r['blockdevices'][0]['name']}" - for part in r['blockdevices'][0]['children']: - part_id = part['name'][len(os.path.basename(self.path)):] - if part_id not in self.part_cache: - # TODO: Force over-write even if in cache? - if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: - self.part_cache[part_id] = Partition(root_path + part_id, block_device=self, part_id=part_id) - - return {k: self.part_cache[k] for k in sorted(self.part_cache)} + return None @property - def partition(self) -> Partition: - all_partitions = self.partitions - return [all_partitions[k] for k in all_partitions] + def partition_type(self) -> str: + return self._block_info.pttype @property - def partition_table_type(self) -> int: - # TODO: Don't hardcode :) - # Remove if we don't use this function anywhere - from .filesystem import GPT - return GPT - - @cached_property def uuid(self) -> str: - log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') - """ - Returns the disk UUID as returned by lsblk. - This is more reliable than relying on /dev/disk/by-partuuid as - it doesn't seam to be able to detect md raid partitions. - """ - return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') - - @cached_property - def _safe_size(self) -> float: - from .helpers import convert_size_to_gb - - try: - output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) - except SysCallError: - return -1.0 - - for device in output['blockdevices']: - return convert_size_to_gb(device['size']) + return self._block_info.ptuuid - @cached_property + @property def size(self) -> float: from .helpers import convert_size_to_gb + return convert_size_to_gb(self._block_info.size) - output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) - - for device in output['blockdevices']: - return convert_size_to_gb(device['size']) - - @cached_property - def bus_type(self) -> str: - output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) - - for device in output['blockdevices']: - return device['tran'] + @property + def bus_type(self) -> Optional[str]: + return self._block_info.tran - @cached_property + @property def spinning(self) -> bool: - output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) + return self._block_info.rota - for device in output['blockdevices']: - return device['rota'] is True + @property + def partitions(self) -> Dict[str, 'Partition']: + return OrderedDict(sorted(self._partitions.items())) - @cached_property - def _safe_free_space(self) -> Tuple[str, ...]: - try: - return '+'.join(part[2] for part in self.free_space) - except SysCallError: - return '?' + @property + def partition(self) -> List['Partition']: + return list(self.partitions.values()) - @cached_property - def free_space(self) -> Tuple[str, ...]: - # NOTE: parted -s will default to `cancel` on prompt, skipping any partition - # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, - # so the free will ignore the ESP partition and just give the "free" space. - # Doesn't harm us, but worth noting in case something weird happens. - try: - for line in SysCommand(f"parted -s --machine {self.path} print free"): - if 'free' in (free_space := line.decode('UTF-8')): - _, start, end, size, *_ = free_space.strip('\r\n;').split(':') - yield (start, end, size) - except SysCallError as error: - log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG) - - @cached_property - def largest_free_space(self) -> List[str]: - info = [] - for space_info in self.free_space: - if not info: - info = space_info - else: - # [-1] = size - if space_info[-1] > info[-1]: - info = space_info - return info - - @cached_property + @property def first_free_sector(self) -> str: - if info := self.largest_free_space: - start = info[0] + if block_size := self._largest_free_space(): + return block_size.start else: - start = '512MB' - return start + return '512MB' - @cached_property + @property def first_end_sector(self) -> str: - if info := self.largest_free_space: - end = info[1] + if block_size := self._largest_free_space(): + return block_size.end else: - end = f"{self.size}GB" - return end - - def partprobe(self) -> bool: - return SysCommand(['partprobe', self.path]).exit_code == 0 + return f"{self.size}GB" + + def _safe_free_space(self) -> str: + if self._block_info.free_space: + sizes = [free_space.size for free_space in self._block_info.free_space] + return '+'.join(sizes) + return '?' + + def _largest_free_space(self) -> Optional[BlockSizeInfo]: + if self._block_info.free_space: + sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True) + return sorted_sizes[0] + return None - def has_partitions(self) -> int: - return len(self.partitions) - - def has_mount_point(self, mountpoint :str) -> bool: - for partition in self.partitions: - if self.partitions[partition].mountpoint == mountpoint: - return True - return False + def _partprobe(self) -> bool: + return SysCommand(['partprobe', self._path]).exit_code == 0 def flush_cache(self) -> None: - self.part_cache = {} + self._load_partitions() def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition: if not uuid and not partuuid: @@ -296,9 +277,9 @@ class BlockDevice: for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)): for partition_index, partition in self.partitions.items(): try: - if uuid and partition.uuid.lower() == uuid.lower(): + if uuid and partition.uuid and partition.uuid.lower() == uuid.lower(): return partition - elif partuuid and partition.part_uuid.lower() == partuuid.lower(): + elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower(): return partition except DiskError as error: # Most likely a blockdevice that doesn't support or use UUID's @@ -307,9 +288,10 @@ class BlockDevice: pass log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) + self.flush_cache() time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) - + log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) - log(f"Cache: {self.part_cache}") + log(f"Cache: {self._partitions}") log(f"Partitions: {self.partitions.items()}") raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.") diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py index 84b9c0f6..a26e0160 100644 --- a/archinstall/lib/disk/btrfs/__init__.py +++ b/archinstall/lib/disk/btrfs/__init__.py @@ -2,8 +2,7 @@ from __future__ import annotations import pathlib import glob import logging -import re -from typing import Union, Dict, TYPE_CHECKING, Any, Iterator +from typing import Union, Dict, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: @@ -15,30 +14,15 @@ from .btrfs_helpers import ( setup_subvolumes as setup_subvolumes, mount_subvolume as mount_subvolume ) -from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume +from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume from .btrfspartition import BTRFSPartition as BTRFSPartition -from ..helpers import get_mount_info from ...exceptions import DiskError, Deprecated from ...general import SysCommand from ...output import log -from ...exceptions import SysCallError -def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: - try: - output = SysCommand(f"btrfs subvol show {path}").decode() - except SysCallError as error: - print('Error:', error) - result = {} - for line in output.replace('\r\n', '\n').split('\n'): - if ':' in line: - key, val = line.replace('\t', '').split(':', 1) - result[key.strip().lower().replace(' ', '_')] = val.strip() - - return result - -def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: +def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -70,113 +54,3 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib. log(f"Creating a subvolume on {target}", level=logging.INFO) if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: raise DiskError(f"Could not create a subvolume at {target}: {cmd}") - -def _has_option(option :str,options :list) -> bool: - """ auxiliary routine to check if an option is present in a list. - we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...) - """ - if not options: - return False - - for item in options: - if option in item: - return True - - return False - -def manage_btrfs_subvolumes(installation :Installer, - partition :Dict[str, str],) -> list: - - raise Deprecated("Use setup_subvolumes() instead.") - - from copy import deepcopy - """ we do the magic with subvolumes in a centralized place - parameters: - * the installation object - * the partition dictionary entry which represents the physical partition - returns - * mountpoinst, the list which contains all the "new" partititon to be mounted - - We expect the partition has been mounted as / , and it to be unmounted after the processing - Then we create all the subvolumes inside btrfs as demand - We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs - Then we return a list of "new" partitions to be processed as "normal" partitions - # TODO For encrypted devices we need some special processing prior to it - """ - # We process each of the pairs <subvolume name: mount point | None | mount info dict> - # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list - # of mount options (or similar used by brtfs) - mountpoints = [] - subvolumes = partition['btrfs']['subvolumes'] - for name, right_hand in subvolumes.items(): - try: - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use - if name.startswith('/'): - name = name[1:] - # renormalize the right hand. - location = None - subvol_options = [] - # no contents, so it is not to be mounted - if not right_hand: - location = None - # just a string. per backward compatibility the mount point - elif isinstance(right_hand,str): - location = right_hand - # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿? - elif isinstance(right_hand,dict): - location = right_hand.get('mountpoint',None) - subvol_options = right_hand.get('options',[]) - # we create the subvolume - create_subvolume(installation,name) - # Make the nodatacow processing now - # It will be the main cause of creation of subvolumes which are not to be mounted - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in subvol_options: - if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del subvol_options[subvol_options.index('nodatacow')] - # Make the compress processing now - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - # in this way only zstd compression is activaded - # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in subvol_options: - if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])): - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so compress doesn't propagate to the mount options - del subvol_options[subvol_options.index('compress')] - # END compress processing. - # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume - if not partition['mountpoint'] and location is not None: - # we begin to create a fake partition entry. First we copy the original -the one that corresponds to - # the primary partition. We make a deepcopy to avoid altering the original content in any case - fake_partition = deepcopy(partition) - # we start to modify entries in the "fake partition" to match the needs of the subvolumes - # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy - del fake_partition['btrfs'] - fake_partition['encrypted'] = False - fake_partition['generate-encryption-key-file'] = False - # Mount destination. As of now the right hand part - fake_partition['mountpoint'] = location - # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path. - fake_partition['subvolume'] = name - # here we add the special mount options for the subvolume, if any. - # if the original partition['options'] is not a list might give trouble - if fake_partition.get('filesystem',{}).get('mount_options',[]): - fake_partition['filesystem']['mount_options'].extend(subvol_options) - else: - fake_partition['filesystem']['mount_options'] = subvol_options - # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. - # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes - # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless - fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" - - # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, - # as "normal" ones - mountpoints.append(fake_partition) - except Exception as e: - raise e - return mountpoints diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py index d577d82b..f6d2734a 100644 --- a/archinstall/lib/disk/btrfs/btrfs_helpers.py +++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py @@ -1,72 +1,73 @@ -import pathlib import logging -from typing import Optional +import re +from pathlib import Path +from typing import Optional, Dict, Any, TYPE_CHECKING +from ...models.subvolume import Subvolume from ...exceptions import SysCallError, DiskError from ...general import SysCommand from ...output import log +from ...plugins import plugins from ..helpers import get_mount_info -from .btrfssubvolume import BtrfsSubvolume +from .btrfssubvolumeinfo import BtrfsSubvolumeInfo +if TYPE_CHECKING: + from .btrfspartition import BTRFSPartition + from ...installer import Installer -def mount_subvolume(installation, device, name, subvolume_information): - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = name.lstrip('/') - # renormalize the right hand. - mountpoint = subvolume_information.get('mountpoint', None) - if not mountpoint: - return None +class fstab_btrfs_compression_plugin(): + def __init__(self, partition_dict): + self.partition_dict = partition_dict + + def on_genfstab(self, installation): + with open(f"{installation.target}/etc/fstab", 'r') as fh: + fstab = fh.read() + + # Replace the {installation}/etc/fstab with entries + # using the compress=zstd where the mountpoint has compression set. + with open(f"{installation.target}/etc/fstab", 'w') as fh: + for line in fstab.split('\n'): + # So first we grab the mount options by using subvol=.*? as a locator. + # And we also grab the mountpoint for the entry, for instance /var/log + if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)): + for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []): + # We then locate the correct subvolume and check if it's compressed + if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip(): + # We then sneak in the compress=zstd option if it doesn't already exist: + # We skip entries where compression is already defined + if ',compress=zstd,' not in line: + line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}") + break + + fh.write(f"{line}\n") - if type(mountpoint) == str: - mountpoint = pathlib.Path(mountpoint) + return True - installation_target = installation.target - if type(installation_target) == str: - installation_target = pathlib.Path(installation_target) + +def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume): + # we normalize the subvolume name (getting rid of slash at the start if exists. + # In our implementation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = subvolume.name.lstrip('/') + mountpoint = Path(subvolume.mountpoint) + installation_target = Path(installation.target) mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) mountpoint.mkdir(parents=True, exist_ok=True) - - mount_options = subvolume_information.get('options', []) - if not any('subvol=' in x for x in mount_options): - mount_options += [f'subvol={name}'] + mount_options = subvolume.options + [f'subvol={name}'] log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") -def setup_subvolumes(installation, partition_dict): - """ - Taken from: ..user_guides.py - - partition['btrfs'] = { - "subvolumes" : { - "@": "/", - "@home": "/home", - "@log": "/var/log", - "@pkg": "/var/cache/pacman/pkg", - "@.snapshots": "/.snapshots" - } - } - """ +def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]): log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") - for name, right_hand in partition_dict['btrfs']['subvolumes'].items(): - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = name.lstrip('/') - # renormalize the right hand. - # mountpoint = None - subvol_options = [] - - match right_hand: - # case str(): # backwards-compatability - # mountpoint = right_hand - case dict(): - # mountpoint = right_hand.get('mountpoint', None) - subvol_options = right_hand.get('options', []) + for subvolume in partition_dict['btrfs']['subvolumes']: + # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = subvolume.name.lstrip('/') # We create the subvolume using the BTRFSPartition instance. # That way we ensure not only easy access, but also accurate mount locations etc. @@ -76,27 +77,28 @@ def setup_subvolumes(installation, partition_dict): # It will be the main cause of creation of subvolumes which are not to be mounted # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in subvol_options: + if subvolume.nodatacow: if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del subvol_options[subvol_options.index('nodatacow')] + # Make the compress processing now # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new # in this way only zstd compression is activaded # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in subvol_options: + if subvolume.compress: if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so compress doesn't propagate to the mount options - del subvol_options[subvol_options.index('compress')] -def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: + if 'fstab_btrfs_compression_plugin' not in plugins: + plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict) + + +def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]: try: - subvolume_name = None + subvolume_name = '' result = {} for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): if index == 0: @@ -110,14 +112,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: # allows for hooking in a pre-processor to do this we have to do it here: result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() - return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result}) - + return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore except SysCallError as error: log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange") return None -def find_parent_subvolume(path :pathlib.Path, filters=[]): + +def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]: # A root path cannot have a parent if str(path) == '/': return None @@ -127,6 +129,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]): if found_mount['target'] == '/': return None - return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']]) + return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']]) - return subvolume
\ No newline at end of file + return subvolume + + return None diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py index 5020133d..d04c9b98 100644 --- a/archinstall/lib/disk/btrfs/btrfspartition.py +++ b/archinstall/lib/disk/btrfs/btrfspartition.py @@ -15,24 +15,13 @@ from .btrfs_helpers import ( if TYPE_CHECKING: from ...installer import Installer - from .btrfssubvolume import BtrfsSubvolume + from .btrfssubvolumeinfo import BtrfsSubvolumeInfo + class BTRFSPartition(Partition): def __init__(self, *args, **kwargs): Partition.__init__(self, *args, **kwargs) - def __repr__(self, *args :str, **kwargs :str) -> str: - mount_repr = '' - if self.mountpoint: - mount_repr = f", mounted={self.mountpoint}" - elif self.target_mountpoint: - mount_repr = f", rel_mountpoint={self.target_mountpoint}" - - if self._encrypted: - return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' - else: - return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - @property def subvolumes(self): for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): @@ -40,17 +29,17 @@ class BTRFSPartition(Partition): yield subvolume_info_from_path(filesystem['target']) def iterate_children(struct): - for child in struct.get('children', []): + for c in struct.get('children', []): if '[' in child.get('source', ''): - yield subvolume_info_from_path(child['target']) + yield subvolume_info_from_path(c['target']) - for sub_child in iterate_children(child): + for sub_child in iterate_children(c): yield sub_child for child in iterate_children(filesystem): yield child - def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume': + def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo': """ Subvolumes have to be created within a mountpoint. This means we need to get the current installation target. @@ -62,13 +51,13 @@ class BTRFSPartition(Partition): if not installation: installation = storage.get('installation_session') - # Determain if the path given, is an absolute path or a releative path. + # Determain if the path given, is an absolute path or a relative path. # We do this by checking if the path contains a known mountpoint. if str(subvolume)[0] == '/': if filesystems := findmnt(subvolume, traverse=True).get('filesystems'): if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target): # Path starts with a known mountpoint which isn't / - # Which means it's an absolut path to a mounted location. + # Which means it's an absolute path to a mounted location. pass else: # Since it's not an absolute position with a known start. @@ -108,9 +97,13 @@ class BTRFSPartition(Partition): if glob.glob(str(subvolume / '*')): raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)") - elif subvolinfo := subvolume_info_from_path(subvolume): - raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") + # Ideally we would like to check if the destination is already a subvolume. + # But then we would need the mount-point at this stage as well. + # So we'll comment out this check: + # elif subvolinfo := subvolume_info_from_path(subvolume): + # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") + # And deal with it here: SysCommand(f"btrfs subvolume create {subvolume}") - return subvolume_info_from_path(subvolume)
\ No newline at end of file + return subvolume_info_from_path(subvolume) diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py index a96e2a94..5f5bdea6 100644 --- a/archinstall/lib/disk/btrfs/btrfssubvolume.py +++ b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py @@ -16,8 +16,9 @@ from ...general import SysCommand from ...output import log from ...storage import storage + @dataclass -class BtrfsSubvolume: +class BtrfsSubvolumeInfo: full_path :pathlib.Path name :str uuid :str @@ -68,9 +69,9 @@ class BtrfsSubvolume: from .btrfs_helpers import subvolume_info_from_path # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first - # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. + # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. # It would also be nice if it could use findmnt(self.full_path) and traverse backwards - # finding the last occurance of a subvolume which 'self' belongs to. + # finding the last occurrence of a subvolume which 'self' belongs to. if volume := subvolume_info_from_path(storage['MOUNT_POINT']): return self.full_path == volume.full_path @@ -188,4 +189,4 @@ class BtrfsSubvolume: def unmount(self, recurse :bool = True): SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") - log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
\ No newline at end of file + log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index f94b4b47..5d5952a0 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -74,7 +74,7 @@ class Filesystem: raise KeyError(f"Could not create a GPT label on {self}") elif self.mode == MBR: if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MSDOS label on {self}") + raise KeyError(f"Could not create a MS-DOS label on {self}") self.blockdevice.flush_cache() time.sleep(3) @@ -100,7 +100,7 @@ class Filesystem: partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid) except DiskError: partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid) - + log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray") else: log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING) @@ -210,7 +210,14 @@ class Filesystem: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition: + def add_partition( + self, + partition_type :str, + start :str, + end :str, + partition_format :Optional[str] = None, + skip_mklabel :bool = False + ) -> Partition: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) if len(self.blockdevice.partitions) == 0 and skip_mklabel is False: @@ -221,7 +228,7 @@ class Filesystem: raise KeyError(f"Could not create a GPT label on {self}") elif self.mode == MBR: if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MSDOS label on {self}") + raise KeyError(f"Could not create a MS-DOS label on {self}") self.blockdevice.flush_cache() @@ -232,6 +239,7 @@ class Filesystem: except DiskError: pass + # TODO this check should probably run in the setup process rather than during the installation if self.mode == MBR: if len(self.blockdevice.partitions) > 3: DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions") @@ -245,15 +253,9 @@ class Filesystem: if self.parted(parted_string): for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)): - self.partprobe() - - new_partition_uuids = [] - for partition in self.blockdevice.partitions.values(): - try: - new_partition_uuids.append(partition.part_uuid) - except DiskError: - pass + self.blockdevice.flush_cache() + new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()] new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids)) if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()): @@ -263,17 +265,23 @@ class Filesystem: log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red") log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red") log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red") - log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red") + log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red") log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red") raise err else: log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG) - time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) + self.partprobe() + time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) + else: + print("Parted did not return True during partition creation") + + total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()]) + total_partitions.update(previous_partuuids) # TODO: This should never be able to happen log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red") log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red") - log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red") + log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red") raise DiskError(f"Could not add partition using: {parted_string}") def set_name(self, partition: int, name: str) -> bool: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index 99856aad..f19125f4 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -8,6 +8,8 @@ import time import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 +from ..models.subvolume import Subvolume + if TYPE_CHECKING: from .partition import Partition @@ -112,7 +114,7 @@ def cleanup_bash_escapes(data :str) -> str: def blkid(cmd :str) -> Dict[str, Any]: if '-o' in cmd and '-o export' not in cmd: - raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.") + raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.") elif '-o' not in cmd: cmd += ' -o export' @@ -133,7 +135,7 @@ def blkid(cmd :str) -> Dict[str, Any]: key, val = line.split('=', 1) if key.lower() == 'devname': devname = val - # Lowercase for backwards compatability with all_disks() previous use cases + # Lowercase for backwards compatibility with all_disks() previous use cases result[devname] = { "path": devname, "PATH": devname @@ -218,7 +220,12 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, # we'll iterate the /sys/class definitions and find the information # from there. for block_device in glob.glob("/sys/class/block/*"): - device_path = f"/dev/{pathlib.Path(block_device).readlink().name}" + device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}") + + if device_path.exists() is False: + log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") + continue + try: information = blkid(f'blkid -p -o export {device_path}') except SysCallError as ex: @@ -227,12 +234,17 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, try: information = get_loop_info(device_path) if not information: + print("Exit code for blkid -p -o export was:", ex.exit_code) raise SysCallError("Could not get loop information", exit_code=1) except SysCallError: + print("Not a loop device, trying uevent rules.") information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) else: + # We could not reliably get any information, perhaps the disk is clean of information? + print("Raising ex because:", ex.exit_code) raise ex + # return instances information = enrich_blockdevice_information(information) @@ -244,7 +256,7 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path)))) elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': instances[path] = BlockDevice(path, path_info) - elif path_info.get('TYPE') == 'squashfs': + elif path_info.get('TYPE') in ('squashfs', 'erofs'): # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) continue else: @@ -368,7 +380,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict return filters -def get_partitions_in_use(mountpoint :str) -> List[Partition]: +def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: from .partition import Partition try: @@ -391,12 +403,20 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]: if not type(blockdev) in (Partition, MapperDev): continue - for blockdev_mountpoint in blockdev.mount_information: - block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev + if isinstance(blockdev, Partition): + for blockdev_mountpoint in blockdev.mountpoints: + block_devices_mountpoints[blockdev_mountpoint] = blockdev + else: + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) for mountpoint in list(get_all_targets(output['filesystems']).keys()): + # Since all_blockdevices() returns PosixPath objects, we need to convert + # findmnt paths to pathlib.Path() first: + mountpoint = pathlib.Path(mountpoint) + if mountpoint in block_devices_mountpoints: if mountpoint not in mounts: mounts[mountpoint] = block_devices_mountpoints[mountpoint] @@ -433,9 +453,10 @@ def disk_layouts() -> Optional[Dict[str, Any]]: def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: - for partition in blockdevices.values(): - if partition.get('encrypted', False): - yield partition + for blockdevice in blockdevices.values(): + for partition in blockdevice.get('partitions', []): + if partition.get('encrypted', False): + yield partition def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: @@ -468,13 +489,14 @@ def convert_device_to_uuid(path :str) -> str: raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.") + def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool: """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path) Coded for clarity rather than performance Input parms: :parm partition the partition we check - :type Either a Partition object or a dict with the contents of a partition definiton in the disk_layouts schema + :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema :parm target (a string representing a mount path we want to check for. :type str @@ -484,10 +506,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri """ # we create the mountpoint list if isinstance(partition,dict): - subvols = partition.get('btrfs',{}).get('subvolumes',{}) - mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols] + subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', []) + mountpoints = [partition.get('mountpoint')] + mountpoints += [volume.mountpoint for volume in subvolumes] else: mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes] + # we check if strict or target == '/': if target in mountpoints: diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py index 913dbc13..71ef2a79 100644 --- a/archinstall/lib/disk/mapperdev.py +++ b/archinstall/lib/disk/mapperdev.py @@ -10,7 +10,7 @@ from ..general import SysCommand from ..output import log if TYPE_CHECKING: - from .btrfs import BtrfsSubvolume + from .btrfs import BtrfsSubvolumeInfo @dataclass class MapperDev: @@ -37,12 +37,12 @@ class MapperDev: for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"): partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name - + try: uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode() except SysCallError as error: log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red") - + information = uevent(uevent_data) block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME']))) @@ -65,9 +65,13 @@ class MapperDev: return None @property + def mountpoints(self) -> List[Dict[str, Any]]: + return [obj['target'] for obj in self.mount_information] + + @property def mount_information(self) -> List[Dict[str, Any]]: from .helpers import find_mountpoint - return list(find_mountpoint(self.path)) + return [{**obj, 'target' : pathlib.Path(obj.get('target', '/dev/null'))} for obj in find_mountpoint(self.path)] @property def filesystem(self) -> Optional[str]: @@ -75,10 +79,10 @@ class MapperDev: return get_filesystem_type(self.path) @property - def subvolumes(self) -> Iterator['BtrfsSubvolume']: + def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']: from .btrfs import subvolume_info_from_path for mountpoint in self.mount_information: if target := mountpoint.get('target'): if subvolume := subvolume_info_from_path(pathlib.Path(target)): - yield subvolume
\ No newline at end of file + yield subvolume diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 73c88597..56a7d436 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -1,60 +1,83 @@ import glob -import pathlib import time import logging import json import os import hashlib +import typing +from dataclasses import dataclass +from pathlib import Path from typing import Optional, Dict, Any, List, Union, Iterator from .blockdevice import BlockDevice -from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name +from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat from ..output import log from ..general import SysCommand from .btrfs.btrfs_helpers import subvolume_info_from_path -from .btrfs.btrfssubvolume import BtrfsSubvolume +from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo + + +@dataclass +class PartitionInfo: + pttype: str + partuuid: str + uuid: str + start: Optional[int] + end: Optional[int] + bootable: bool + size: float + sector_size: int + filesystem_type: str + mountpoints: List[Path] + + def get_first_mountpoint(self) -> Optional[Path]: + if len(self.mountpoints) > 0: + return self.mountpoints[0] + return None + class Partition: - def __init__(self, + def __init__( + self, path: str, block_device: BlockDevice, part_id :Optional[str] = None, filesystem :Optional[str] = None, mountpoint :Optional[str] = None, encrypted :bool = False, - autodetect_filesystem :bool = True): - + autodetect_filesystem :bool = True, + ): if not part_id: part_id = os.path.basename(path) - self.block_device = block_device - if type(self.block_device) is str: + if type(block_device) is str: raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") - self.path = path - self.part_id = part_id - self.target_mountpoint = mountpoint - self.filesystem = filesystem + self.block_device = block_device + self._path = path + self._part_id = part_id + self._target_mountpoint = mountpoint self._encrypted = None - self.encrypted = encrypted - self.allow_formatting = False + self._encrypted = encrypted + self._wipe = False + self._type = 'primary' if mountpoint: self.mount(mountpoint) - try: - self.mount_information = list(find_mountpoint(self.path)) - except DiskError: - self.mount_information = [{}] + self._partition_info = self._fetch_information() - if not self.filesystem and autodetect_filesystem: - self.filesystem = get_filesystem_type(path) + if not autodetect_filesystem and filesystem: + self._partition_info.filesystem_type = filesystem - if self.filesystem == 'crypto_LUKS': - self.encrypted = True + if self._partition_info.filesystem_type == 'crypto_LUKS': + self._encrypted = True + # I hate doint this but I'm currently unsure where this + # is acutally used to be able to fix the typing issues properly + @typing.no_type_check def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path @@ -62,147 +85,175 @@ class Partition: left_comparitor = str(left_comparitor) # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 - return self.path < left_comparitor + return self._path < left_comparitor def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' - if self.mountpoint: - mount_repr = f", mounted={self.mountpoint}" - elif self.target_mountpoint: - mount_repr = f", rel_mountpoint={self.target_mountpoint}" + if mountpoint := self._partition_info.get_first_mountpoint(): + mount_repr = f", mounted={mountpoint}" + elif self._target_mountpoint: + mount_repr = f", rel_mountpoint={self._target_mountpoint}" + + classname = self.__class__.__name__ if self._encrypted: - return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' + return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})' else: - return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' + return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})' + + def as_json(self) -> Dict[str, Any]: + """ + this is used for the table representation of the partition (see FormattedOutput) + """ + partition_info = { + 'type': self._type, + 'PARTUUID': self.part_uuid, + 'wipe': self._wipe, + 'boot': self.boot, + 'ESP': self.boot, + 'mountpoint': self._target_mountpoint, + 'encrypted': self._encrypted, + 'start': self.start, + 'size': self.end, + 'filesystem': self._partition_info.filesystem_type + } + + return partition_info def __dump__(self) -> Dict[str, Any]: + # TODO remove this in favour of as_json return { - 'type': 'primary', - 'PARTUUID': self._safe_uuid, - 'wipe': self.allow_formatting, + 'type': self._type, + 'PARTUUID': self.part_uuid, + 'wipe': self._wipe, 'boot': self.boot, 'ESP': self.boot, - 'mountpoint': self.target_mountpoint, + 'mountpoint': self._target_mountpoint, 'encrypted': self._encrypted, 'start': self.start, 'size': self.end, 'filesystem': { - 'format': get_filesystem_type(self.path) + 'format': self._partition_info.filesystem_type } } - @property - def mountpoint(self) -> Optional[str]: - try: - data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) - for filesystem in data['filesystems']: - return pathlib.Path(filesystem.get('target')) + def _call_lsblk(self) -> Dict[str, Any]: + self.partprobe() + # This sleep might be overkill, but lsblk is known to + # work against a chaotic cache that can change during call + # causing no information to be returned (blkid is better) + # time.sleep(1) + # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) + + try: + output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') except SysCallError as error: - # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey") - pass + # It appears as if lsblk can return exit codes like 8192 to indicate something. + # But it does return output so we'll try to catch it. + output = error.worker.decode('UTF-8') - return None + if output: + lsblk_info = json.loads(output) + return lsblk_info - @property - def sector_size(self) -> Optional[int]: - output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) + raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk') - for device in output['blockdevices']: - return device.get('log-sec', None) + def _call_sfdisk(self) -> Dict[str, Any]: + output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8') - @property - def start(self) -> Optional[str]: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + if output: + sfdisk_info = json.loads(output) + partitions = sfdisk_info.get('partitiontable', {}).get('partitions', []) + node = list(filter(lambda x: x['node'] == self._path, partitions)) - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['start'] # * self.sector_size + if len(node) > 0: + return node[0] - @property - def end(self) -> Optional[str]: - # TODO: actually this is size in sectors unit - # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + return {} + + raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk') + + def _fetch_information(self) -> PartitionInfo: + lsblk_info = self._call_lsblk() + sfdisk_info = self._call_sfdisk() + + if not (device := lsblk_info.get('blockdevices', [None])[0]): + raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['size'] # * self.sector_size + mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint] + bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' + + return PartitionInfo( + pttype=device['pttype'], + partuuid=device['partuuid'], + uuid=device['uuid'], + sector_size=device['log-sec'], + size=convert_size_to_gb(device['size']), + start=sfdisk_info.get('start', None), + end=sfdisk_info.get('size', None), + bootable=bootable, + filesystem_type=device['fstype'], + mountpoints=mountpoints + ) @property - def end_sectors(self) -> Optional[str]: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + def target_mountpoint(self) -> Optional[str]: + return self._target_mountpoint - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['start'] + partition['size'] + @property + def path(self) -> str: + return self._path @property - def size(self) -> Optional[float]: - for i in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) + def filesystem(self) -> str: + return self._partition_info.filesystem_type - try: - lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode()) + @property + def mountpoint(self) -> Optional[Path]: + if len(self.mountpoints) > 0: + return self.mountpoints[0] + return None - for device in lsblk['blockdevices']: - return convert_size_to_gb(device['size']) - except SysCallError as error: - if error.exit_code == 8192: - return None - else: - raise error + @property + def mountpoints(self) -> List[Path]: + return self._partition_info.mountpoints @property - def boot(self) -> bool: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) - - # Get the bootable flag from the sfdisk output: - # { - # "partitiontable": { - # "device":"/dev/loop0", - # "partitions": [ - # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true} - # ] - # } - # } - - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - # first condition is for MBR disks, second for GPT disks - return partition.get('bootable', False) or partition.get('type','') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' + def sector_size(self) -> int: + return self._partition_info.sector_size - return False + @property + def start(self) -> Optional[int]: + return self._partition_info.start @property - def partition_type(self) -> Optional[str]: - lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) + def end(self) -> Optional[int]: + return self._partition_info.end - for device in lsblk['blockdevices']: - return device['pttype'] + @property + def end_sectors(self) -> Optional[int]: + start = self._partition_info.start + end = self._partition_info.end + if start and end: + return start + end + return None @property - def part_uuid(self) -> Optional[str]: - """ - Returns the PARTUUID as returned by lsblk. - This is more reliable than relying on /dev/disk/by-partuuid as - it doesn't seam to be able to detect md raid partitions. - For bind mounts all the subvolumes share the same uuid - """ - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if not self.partprobe(): - raise DiskError(f"Could not perform partprobe on {self.device_path}") - - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) + def size(self) -> Optional[float]: + return self._partition_info.size - partuuid = self._safe_part_uuid - if partuuid: - return partuuid + @property + def boot(self) -> bool: + return self._partition_info.bootable - raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") + @property + def partition_type(self) -> Optional[str]: + return self._partition_info.pttype + + @property + def part_uuid(self) -> str: + return self._partition_info.partuuid @property def uuid(self) -> Optional[str]: @@ -232,7 +283,7 @@ class Partition: For instance when you want to get a __repr__ of the class. """ if not self.partprobe(): - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': return None log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) @@ -240,7 +291,7 @@ class Partition: try: return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip() except SysCallError as error: - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) return None @@ -254,7 +305,7 @@ class Partition: For instance when you want to get a __repr__ of the class. """ if not self.partprobe(): - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': return None log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) @@ -262,37 +313,39 @@ class Partition: try: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() except SysCallError as error: - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) return None log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}") + return self._partition_info.uuid + @property def encrypted(self) -> Union[bool, None]: return self._encrypted - @encrypted.setter - def encrypted(self, value: bool) -> None: - self._encrypted = value - @property def parent(self) -> str: return self.real_device @property def real_device(self) -> str: - for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: - if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): - return f"/dev/{parent}" - # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') - return self.path + output = SysCommand('lsblk -J').decode('UTF-8') + + if output: + for blockdevice in json.loads(output)['blockdevices']: + if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): + return f"/dev/{parent}" + return self._path + + raise DiskError('Unable to get disk information for command "lsblk -J"') @property def device_path(self) -> str: - """ for bind mounts returns the phisical path of the partition + """ for bind mounts returns the physical path of the partition """ - device_path, bind_name = split_bind_name(self.path) + device_path, bind_name = split_bind_name(self._path) return device_path @property @@ -300,38 +353,40 @@ class Partition: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ - device_path, bind_name = split_bind_name(self.path) + device_path, bind_name = split_bind_name(self._path) return bind_name @property - def subvolumes(self) -> Iterator[BtrfsSubvolume]: + def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]: from .helpers import findmnt - + def iterate_children_recursively(information): for child in information.get('children', []): if target := child.get('target'): - if subvolume := subvolume_info_from_path(pathlib.Path(target)): - yield subvolume + if child.get('fstype') == 'btrfs': + if subvolume := subvolume_info_from_path(Path(target)): + yield subvolume if child.get('children'): for subchild in iterate_children_recursively(child): yield subchild - for mountpoint in self.mount_information: - if result := findmnt(pathlib.Path(mountpoint['target'])): - for filesystem in result.get('filesystems', []): - if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])): - yield subvolume + if self._partition_info.filesystem_type == 'btrfs': + for mountpoint in self._partition_info.mountpoints: + if result := findmnt(mountpoint): + for filesystem in result.get('filesystems', []): + if subvolume := subvolume_info_from_path(mountpoint): + yield subvolume - for child in iterate_children_recursively(filesystem): - yield child + for child in iterate_children_recursively(filesystem): + yield child def partprobe(self) -> bool: try: if self.block_device: return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code except SysCallError as error: - log(f"Unreliable results might be given for {self.path} due to partprobe error: {error}", level=logging.DEBUG) + log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG) return False @@ -343,19 +398,20 @@ class Partition: with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device: return unlocked_device.filesystem except SysCallError: - return None + pass + return None def has_content(self) -> bool: - fs_type = get_filesystem_type(self.path) + fs_type = self._partition_info.filesystem_type if not fs_type or "swap" in fs_type: return False temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() - temporary_path = pathlib.Path(temporary_mountpoint) + temporary_path = Path(temporary_mountpoint) temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: - raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') + if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0: + raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}') files = len(glob.glob(f"{temporary_mountpoint}/*")) iterations = 0 @@ -366,14 +422,14 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args :str, **kwargs :str) -> str: + def encrypt(self, password: Optional[str] = None) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ from ..luks import luks2 handle = luks2(self, None, None) - return handle.encrypt(self, *args, **kwargs) + return handle.encrypt(self, password=password) def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool: """ @@ -381,17 +437,17 @@ class Partition: the formatting functionality and in essence the support for the given filesystem. """ if filesystem is None: - filesystem = self.filesystem + filesystem = self._partition_info.filesystem_type if path is None: - path = self.path + path = self._path # This converts from fat32 -> vfat to unify filesystem names filesystem = get_mount_fs_type(filesystem) # To avoid "unable to open /dev/x: No such file or directory" start_wait = time.time() - while pathlib.Path(path).exists() is False and time.time() - start_wait < 10: + while Path(path).exists() is False and time.time() - start_wait < 10: time.sleep(0.025) if log_formatting: @@ -401,57 +457,57 @@ class Partition: if filesystem == 'btrfs': options = ['-f'] + options - if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')): + mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8') + if mkfs and 'UUID:' not in mkfs: raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'vfat': options = ['-F32'] + options - + log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}") if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ext4': options = ['-F'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ext2': options = ['-F'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') - self.filesystem = 'ext2' - + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self._partition_info.filesystem_type = 'ext2' elif filesystem == 'xfs': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'f2fs': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ntfs3': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'crypto_LUKS': # from ..luks import luks2 # encrypted_partition = luks2(self, None, None) # encrypted_partition.format(path) - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem else: raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") @@ -460,13 +516,13 @@ class Partition: if retry is True: log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange") time.sleep(storage.get('DISK_TIMEOUTS', 1)) - + return self.format(filesystem, path, log_formatting, options, retry=False) if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': - self.encrypted = True + self._encrypted = True else: - self.encrypted = False + self._encrypted = False return True @@ -478,18 +534,18 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent + return None + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: - if not self.mountpoint: + if not self._partition_info.get_first_mountpoint(): log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: - if not self.filesystem: - raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') - fs = self.filesystem + fs = self._partition_info.filesystem_type fs_type = get_mount_fs_type(fs) - pathlib.Path(target).mkdir(parents=True, exist_ok=True) + Path(target).mkdir(parents=True, exist_ok=True) if self.bind_name: device_path = self.device_path @@ -499,7 +555,7 @@ class Partition: else: options = f"subvol={self.bind_name}" else: - device_path = self.path + device_path = self._path try: if options: mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}") @@ -508,7 +564,7 @@ class Partition: # TODO: Should be redundant to check for exit_code if mnt_handle.exit_code != 0: - raise DiskError(f"Could not mount {self.path} to {target} using options {options}") + raise DiskError(f"Could not mount {self._path} to {target} using options {options}") except SysCallError as err: raise err @@ -517,19 +573,17 @@ class Partition: return False def unmount(self) -> bool: - worker = SysCommand(f"/usr/bin/umount {self.path}") + worker = SysCommand(f"/usr/bin/umount {self._path}") + exit_code = worker.exit_code # Without to much research, it seams that low error codes are errors. # And above 8k is indicators such as "/dev/x not mounted.". # So anything in between 0 and 8k are errors (?). - if 0 < worker.exit_code < 8000: - raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) + if exit_code and 0 < exit_code < 8000: + raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code) return True - def umount(self) -> bool: - return self.unmount() - def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling @@ -538,7 +592,7 @@ class Partition: 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type """ try: - self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) + self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False) except (SysCallError, DiskError): pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code except UnknownFilesystemFormat as err: diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 5fa6bfdc..5809c073 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -3,6 +3,8 @@ import logging from typing import Optional, Dict, Any, List, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 +from ..models.subvolume import Subvolume + if TYPE_CHECKING: from .blockdevice import BlockDevice _: Any @@ -107,17 +109,14 @@ def suggest_single_disk_layout(block_device :BlockDevice, # https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash # https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh layout[block_device.path]['partitions'][1]['btrfs'] = { - "subvolumes" : { - "@":"/", - "@home": "/home", - "@log": "/var/log", - "@pkg": "/var/cache/pacman/pkg", - "@.snapshots": "/.snapshots" - } + 'subvolumes': [ + Subvolume('@', '/'), + Subvolume('@home', '/home'), + Subvolume('@log', '/var/log'), + Subvolume('@pkg', '/var/cache/pacman/pkg'), + Subvolume('@.snapshots', '/.snapshots') + ] } - # else: - # pass # ... implement a guided setup - elif using_home_partition: # If we don't want to use subvolumes, # But we want to be able to re-use data between re-installs.. diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index fd1b7f33..54808886 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -7,13 +7,11 @@ def valid_parted_position(pos :str) -> bool: if pos.isdigit(): return True - if pos[-1] == '%' and pos[:-1].isdigit(): + if pos.lower().endswith('b') and pos[:-1].isdigit(): return True - if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].replace(".", "", 1).isdigit(): - return True - - if pos[-2:].lower() in ['kb', 'mb', 'gb', 'tb'] and pos[:-2].replace(".", "", 1).isdigit(): + if any(pos.lower().endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit() + for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']): return True return False |