Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/blockdevice.py348
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py132
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py126
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py37
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py (renamed from archinstall/lib/disk/btrfs/btrfssubvolume.py)9
-rw-r--r--archinstall/lib/disk/filesystem.py38
-rw-r--r--archinstall/lib/disk/helpers.py50
-rw-r--r--archinstall/lib/disk/mapperdev.py16
-rw-r--r--archinstall/lib/disk/partition.py426
-rw-r--r--archinstall/lib/disk/user_guides.py19
-rw-r--r--archinstall/lib/disk/validators.py8
11 files changed, 575 insertions, 634 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index c7b69205..736bacbc 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -1,13 +1,11 @@
from __future__ import annotations
-import os
import json
import logging
import time
-from functools import cached_property
-from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-if TYPE_CHECKING:
- from .partition import Partition
+
+from collections import OrderedDict
+from dataclasses import dataclass
+from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING
from ..exceptions import DiskError, SysCallError
from ..output import log
@@ -15,18 +13,44 @@ from ..general import SysCommand
from ..storage import storage
+if TYPE_CHECKING:
+ from .partition import Partition
+ _: Any
+
+
+@dataclass
+class BlockSizeInfo:
+ start: str
+ end: str
+ size: str
+
+
+@dataclass
+class BlockInfo:
+ pttype: str
+ ptuuid: str
+ size: int
+ tran: Optional[str]
+ rota: bool
+ free_space: Optional[List[BlockSizeInfo]]
+
+
class BlockDevice:
def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
from .helpers import all_blockdevices
# If we don't give any information, we need to auto-fill it.
# Otherwise any subsequent usage will break.
- info = all_blockdevices(partitions=False)[path].info
+ self.info = all_blockdevices(partitions=False)[path].info
+ else:
+ self.info = info
- self.path = path
- self.info = info
+ self._path = path
self.keep_partitions = True
- self.part_cache = {}
+ self._block_info = self._fetch_information()
+ self._partitions: Dict[str, 'Partition'] = {}
+
+ self._load_partitions()
# TODO: Currently disk encryption is a BIT misleading.
# It's actually partition-encryption, but for future-proofing this
@@ -35,70 +59,113 @@ class BlockDevice:
def __repr__(self, *args :str, **kwargs :str) -> str:
return self._str_repr
- @cached_property
+ @property
+ def path(self) -> str:
+ return self._path
+
+ @property
def _str_repr(self) -> str:
- return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})"
-
- @cached_property
- def display_info(self) -> str:
- columns = {
- str(_('Device')): self.device_or_backfile,
- str(_('Size')): f'{self._safe_size}GB',
- str(_('Free space')): f'{self._safe_free_space}',
+ return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})"
+
+ def as_json(self) -> Dict[str, Any]:
+ return {
+ str(_('Device')): self._device_or_backfile,
+ str(_('Size')): f'{self.size}GB',
+ str(_('Free space')): f'{self._safe_free_space()}',
str(_('Bus-type')): f'{self.bus_type}'
}
- padding = max([len(k) for k in columns.keys()])
-
- pretty = ''
- for k, v in columns.items():
- k = k.ljust(padding, ' ')
- pretty += f'{k} = {v}\n'
-
- return pretty.rstrip()
-
- def __iter__(self) -> Iterator[Partition]:
+ def __iter__(self) -> Iterator['Partition']:
for partition in self.partitions:
yield self.partitions[partition]
def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any:
if hasattr(self, key):
return getattr(self, key)
- elif key not in self.info:
- raise KeyError(f'{self} does not contain information: "{key}"')
- return self.info[key]
- def __len__(self) -> int:
- return len(self.partitions)
+ if self.info and key in self.info:
+ return self.info[key]
+
+ raise KeyError(f'{self.info} does not contain information: "{key}"')
def __lt__(self, left_comparitor :'BlockDevice') -> bool:
- return self.path < left_comparitor.path
+ return self._path < left_comparitor.path
def json(self) -> str:
"""
json() has precedence over __dump__, so this is a way
to give less/partial information for user readability.
"""
- return self.path
+ return self._path
def __dump__(self) -> Dict[str, Dict[str, Any]]:
return {
- self.path : {
- 'partuuid' : self.uuid,
- 'wipe' : self.info.get('wipe', None),
- 'partitions' : [part.__dump__() for part in self.partitions.values()]
+ self._path: {
+ 'partuuid': self.uuid,
+ 'wipe': self.info.get('wipe', None),
+ 'partitions': [part.__dump__() for part in self.partitions.values()]
}
}
- @property
- def partition_type(self) -> str:
- output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
+ def _call_lsblk(self, path: str) -> Dict[str, Any]:
+ output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8')
+ if output:
+ lsblk_info = json.loads(output)
+ return lsblk_info
+
+ raise DiskError(f'Failed to read disk "{self.path}" with lsblk')
+
+ def _load_partitions(self):
+ from .partition import Partition
+
+ self._partitions.clear()
+
+ lsblk_info = self._call_lsblk(self._path)
+ device = lsblk_info['blockdevices'][0]
+ self._partitions.clear()
+
+ if children := device.get('children', None):
+ root = f'/dev/{device["name"]}'
+ for child in children:
+ part_id = child['name'].removeprefix(device['name'])
+ self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id)
+
+ def _get_free_space(self) -> Optional[List[BlockSizeInfo]]:
+ # NOTE: parted -s will default to `cancel` on prompt, skipping any partition
+ # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
+ # so the free will ignore the ESP partition and just give the "free" space.
+ # Doesn't harm us, but worth noting in case something weird happens.
+ try:
+ output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8')
+ if output:
+ free_lines = [line for line in output.split('\n') if 'free' in line]
+ sizes = []
+ for free_space in free_lines:
+ _, start, end, size, *_ = free_space.strip('\r\n;').split(':')
+ sizes.append(BlockSizeInfo(start, end, size))
+
+ return sizes
+ except SysCallError as error:
+ log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG)
+
+ return None
+
+ def _fetch_information(self) -> BlockInfo:
+ lsblk_info = self._call_lsblk(self._path)
+ device = lsblk_info['blockdevices'][0]
+ free_space = self._get_free_space()
- for device in output['blockdevices']:
- return device['pttype']
+ return BlockInfo(
+ pttype=device['pttype'],
+ ptuuid=device['ptuuid'],
+ size=device['size'],
+ tran=device['tran'],
+ rota=device['rota'],
+ free_space=free_space
+ )
- @cached_property
- def device_or_backfile(self) -> str:
+ @property
+ def _device_or_backfile(self) -> Optional[str]:
"""
Returns the actual device-endpoint of the BlockDevice.
If it's a loop-back-device it returns the back-file,
@@ -118,7 +185,7 @@ class BlockDevice:
return None
@property
- def device(self) -> str:
+ def device(self) -> Optional[str]:
"""
Returns the device file of the BlockDevice.
If it's a loop-back-device it returns the /dev/X device,
@@ -126,168 +193,82 @@ class BlockDevice:
And if it's a crypto-device it returns the parent device
"""
if "DEVTYPE" not in self.info:
- raise DiskError(f'Could not locate backplane info for "{self.path}"')
+ raise DiskError(f'Could not locate backplane info for "{self._path}"')
if self.info['DEVTYPE'] in ['disk','loop']:
- return self.path
+ return self._path
elif self.info['DEVTYPE'][:4] == 'raid':
# This should catch /dev/md## raid devices
- return self.path
+ return self._path
elif self.info['DEVTYPE'] == 'crypt':
if 'pkname' not in self.info:
- raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
+ raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
else:
- log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
-
- # if not stat.S_ISBLK(os.stat(full_path).st_mode):
- # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
-
- @property
- def partitions(self) -> Dict[str, Partition]:
- from .filesystem import Partition
-
- self.partprobe()
- result = SysCommand(['/usr/bin/lsblk', '-J', self.path])
-
- if b'not a block device' in result:
- raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}')
+ log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
- if not result[:1] == b'{':
- raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}')
-
- r = json.loads(result.decode('UTF-8'))
- if len(r['blockdevices']) and 'children' in r['blockdevices'][0]:
- root_path = f"/dev/{r['blockdevices'][0]['name']}"
- for part in r['blockdevices'][0]['children']:
- part_id = part['name'][len(os.path.basename(self.path)):]
- if part_id not in self.part_cache:
- # TODO: Force over-write even if in cache?
- if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']:
- self.part_cache[part_id] = Partition(root_path + part_id, block_device=self, part_id=part_id)
-
- return {k: self.part_cache[k] for k in sorted(self.part_cache)}
+ return None
@property
- def partition(self) -> Partition:
- all_partitions = self.partitions
- return [all_partitions[k] for k in all_partitions]
+ def partition_type(self) -> str:
+ return self._block_info.pttype
@property
- def partition_table_type(self) -> int:
- # TODO: Don't hardcode :)
- # Remove if we don't use this function anywhere
- from .filesystem import GPT
- return GPT
-
- @cached_property
def uuid(self) -> str:
- log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
- """
- Returns the disk UUID as returned by lsblk.
- This is more reliable than relying on /dev/disk/by-partuuid as
- it doesn't seam to be able to detect md raid partitions.
- """
- return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
-
- @cached_property
- def _safe_size(self) -> float:
- from .helpers import convert_size_to_gb
-
- try:
- output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
- except SysCallError:
- return -1.0
-
- for device in output['blockdevices']:
- return convert_size_to_gb(device['size'])
+ return self._block_info.ptuuid
- @cached_property
+ @property
def size(self) -> float:
from .helpers import convert_size_to_gb
+ return convert_size_to_gb(self._block_info.size)
- output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- return convert_size_to_gb(device['size'])
-
- @cached_property
- def bus_type(self) -> str:
- output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- return device['tran']
+ @property
+ def bus_type(self) -> Optional[str]:
+ return self._block_info.tran
- @cached_property
+ @property
def spinning(self) -> bool:
- output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
+ return self._block_info.rota
- for device in output['blockdevices']:
- return device['rota'] is True
+ @property
+ def partitions(self) -> Dict[str, 'Partition']:
+ return OrderedDict(sorted(self._partitions.items()))
- @cached_property
- def _safe_free_space(self) -> Tuple[str, ...]:
- try:
- return '+'.join(part[2] for part in self.free_space)
- except SysCallError:
- return '?'
+ @property
+ def partition(self) -> List['Partition']:
+ return list(self.partitions.values())
- @cached_property
- def free_space(self) -> Tuple[str, ...]:
- # NOTE: parted -s will default to `cancel` on prompt, skipping any partition
- # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
- # so the free will ignore the ESP partition and just give the "free" space.
- # Doesn't harm us, but worth noting in case something weird happens.
- try:
- for line in SysCommand(f"parted -s --machine {self.path} print free"):
- if 'free' in (free_space := line.decode('UTF-8')):
- _, start, end, size, *_ = free_space.strip('\r\n;').split(':')
- yield (start, end, size)
- except SysCallError as error:
- log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
-
- @cached_property
- def largest_free_space(self) -> List[str]:
- info = []
- for space_info in self.free_space:
- if not info:
- info = space_info
- else:
- # [-1] = size
- if space_info[-1] > info[-1]:
- info = space_info
- return info
-
- @cached_property
+ @property
def first_free_sector(self) -> str:
- if info := self.largest_free_space:
- start = info[0]
+ if block_size := self._largest_free_space():
+ return block_size.start
else:
- start = '512MB'
- return start
+ return '512MB'
- @cached_property
+ @property
def first_end_sector(self) -> str:
- if info := self.largest_free_space:
- end = info[1]
+ if block_size := self._largest_free_space():
+ return block_size.end
else:
- end = f"{self.size}GB"
- return end
-
- def partprobe(self) -> bool:
- return SysCommand(['partprobe', self.path]).exit_code == 0
+ return f"{self.size}GB"
+
+ def _safe_free_space(self) -> str:
+ if self._block_info.free_space:
+ sizes = [free_space.size for free_space in self._block_info.free_space]
+ return '+'.join(sizes)
+ return '?'
+
+ def _largest_free_space(self) -> Optional[BlockSizeInfo]:
+ if self._block_info.free_space:
+ sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True)
+ return sorted_sizes[0]
+ return None
- def has_partitions(self) -> int:
- return len(self.partitions)
-
- def has_mount_point(self, mountpoint :str) -> bool:
- for partition in self.partitions:
- if self.partitions[partition].mountpoint == mountpoint:
- return True
- return False
+ def _partprobe(self) -> bool:
+ return SysCommand(['partprobe', self._path]).exit_code == 0
def flush_cache(self) -> None:
- self.part_cache = {}
+ self._load_partitions()
def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition:
if not uuid and not partuuid:
@@ -296,9 +277,9 @@ class BlockDevice:
for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)):
for partition_index, partition in self.partitions.items():
try:
- if uuid and partition.uuid.lower() == uuid.lower():
+ if uuid and partition.uuid and partition.uuid.lower() == uuid.lower():
return partition
- elif partuuid and partition.part_uuid.lower() == partuuid.lower():
+ elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower():
return partition
except DiskError as error:
# Most likely a blockdevice that doesn't support or use UUID's
@@ -307,9 +288,10 @@ class BlockDevice:
pass
log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
+ self.flush_cache()
time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
-
+
log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)
- log(f"Cache: {self.part_cache}")
+ log(f"Cache: {self._partitions}")
log(f"Partitions: {self.partitions.items()}")
raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.")
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py
index 84b9c0f6..a26e0160 100644
--- a/archinstall/lib/disk/btrfs/__init__.py
+++ b/archinstall/lib/disk/btrfs/__init__.py
@@ -2,8 +2,7 @@ from __future__ import annotations
import pathlib
import glob
import logging
-import re
-from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+from typing import Union, Dict, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -15,30 +14,15 @@ from .btrfs_helpers import (
setup_subvolumes as setup_subvolumes,
mount_subvolume as mount_subvolume
)
-from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume
from .btrfspartition import BTRFSPartition as BTRFSPartition
-from ..helpers import get_mount_info
from ...exceptions import DiskError, Deprecated
from ...general import SysCommand
from ...output import log
-from ...exceptions import SysCallError
-def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
- try:
- output = SysCommand(f"btrfs subvol show {path}").decode()
- except SysCallError as error:
- print('Error:', error)
- result = {}
- for line in output.replace('\r\n', '\n').split('\n'):
- if ':' in line:
- key, val = line.replace('\t', '').split(':', 1)
- result[key.strip().lower().replace(' ', '_')] = val.strip()
-
- return result
-
-def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
+def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -70,113 +54,3 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.
log(f"Creating a subvolume on {target}", level=logging.INFO)
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
-
-def _has_option(option :str,options :list) -> bool:
- """ auxiliary routine to check if an option is present in a list.
- we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...)
- """
- if not options:
- return False
-
- for item in options:
- if option in item:
- return True
-
- return False
-
-def manage_btrfs_subvolumes(installation :Installer,
- partition :Dict[str, str],) -> list:
-
- raise Deprecated("Use setup_subvolumes() instead.")
-
- from copy import deepcopy
- """ we do the magic with subvolumes in a centralized place
- parameters:
- * the installation object
- * the partition dictionary entry which represents the physical partition
- returns
- * mountpoinst, the list which contains all the "new" partititon to be mounted
-
- We expect the partition has been mounted as / , and it to be unmounted after the processing
- Then we create all the subvolumes inside btrfs as demand
- We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
- Then we return a list of "new" partitions to be processed as "normal" partitions
- # TODO For encrypted devices we need some special processing prior to it
- """
- # We process each of the pairs <subvolume name: mount point | None | mount info dict>
- # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
- # of mount options (or similar used by brtfs)
- mountpoints = []
- subvolumes = partition['btrfs']['subvolumes']
- for name, right_hand in subvolumes.items():
- try:
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
- if name.startswith('/'):
- name = name[1:]
- # renormalize the right hand.
- location = None
- subvol_options = []
- # no contents, so it is not to be mounted
- if not right_hand:
- location = None
- # just a string. per backward compatibility the mount point
- elif isinstance(right_hand,str):
- location = right_hand
- # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
- elif isinstance(right_hand,dict):
- location = right_hand.get('mountpoint',None)
- subvol_options = right_hand.get('options',[])
- # we create the subvolume
- create_subvolume(installation,name)
- # Make the nodatacow processing now
- # It will be the main cause of creation of subvolumes which are not to be mounted
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
- if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
- # Make the compress processing now
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- # in this way only zstd compression is activaded
- # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
- if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])):
- if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
- # END compress processing.
- # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
- if not partition['mountpoint'] and location is not None:
- # we begin to create a fake partition entry. First we copy the original -the one that corresponds to
- # the primary partition. We make a deepcopy to avoid altering the original content in any case
- fake_partition = deepcopy(partition)
- # we start to modify entries in the "fake partition" to match the needs of the subvolumes
- # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
- del fake_partition['btrfs']
- fake_partition['encrypted'] = False
- fake_partition['generate-encryption-key-file'] = False
- # Mount destination. As of now the right hand part
- fake_partition['mountpoint'] = location
- # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
- fake_partition['subvolume'] = name
- # here we add the special mount options for the subvolume, if any.
- # if the original partition['options'] is not a list might give trouble
- if fake_partition.get('filesystem',{}).get('mount_options',[]):
- fake_partition['filesystem']['mount_options'].extend(subvol_options)
- else:
- fake_partition['filesystem']['mount_options'] = subvol_options
- # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
- # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
- # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
- fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
-
- # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
- # as "normal" ones
- mountpoints.append(fake_partition)
- except Exception as e:
- raise e
- return mountpoints
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
index d577d82b..f6d2734a 100644
--- a/archinstall/lib/disk/btrfs/btrfs_helpers.py
+++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py
@@ -1,72 +1,73 @@
-import pathlib
import logging
-from typing import Optional
+import re
+from pathlib import Path
+from typing import Optional, Dict, Any, TYPE_CHECKING
+from ...models.subvolume import Subvolume
from ...exceptions import SysCallError, DiskError
from ...general import SysCommand
from ...output import log
+from ...plugins import plugins
from ..helpers import get_mount_info
-from .btrfssubvolume import BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+if TYPE_CHECKING:
+ from .btrfspartition import BTRFSPartition
+ from ...installer import Installer
-def mount_subvolume(installation, device, name, subvolume_information):
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
- # renormalize the right hand.
- mountpoint = subvolume_information.get('mountpoint', None)
- if not mountpoint:
- return None
+class fstab_btrfs_compression_plugin():
+ def __init__(self, partition_dict):
+ self.partition_dict = partition_dict
+
+ def on_genfstab(self, installation):
+ with open(f"{installation.target}/etc/fstab", 'r') as fh:
+ fstab = fh.read()
+
+ # Replace the {installation}/etc/fstab with entries
+ # using the compress=zstd where the mountpoint has compression set.
+ with open(f"{installation.target}/etc/fstab", 'w') as fh:
+ for line in fstab.split('\n'):
+ # So first we grab the mount options by using subvol=.*? as a locator.
+ # And we also grab the mountpoint for the entry, for instance /var/log
+ if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)):
+ for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []):
+ # We then locate the correct subvolume and check if it's compressed
+ if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip():
+ # We then sneak in the compress=zstd option if it doesn't already exist:
+ # We skip entries where compression is already defined
+ if ',compress=zstd,' not in line:
+ line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}")
+ break
+
+ fh.write(f"{line}\n")
- if type(mountpoint) == str:
- mountpoint = pathlib.Path(mountpoint)
+ return True
- installation_target = installation.target
- if type(installation_target) == str:
- installation_target = pathlib.Path(installation_target)
+
+def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
+ # we normalize the subvolume name (getting rid of slash at the start if exists.
+ # In our implementation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = subvolume.name.lstrip('/')
+ mountpoint = Path(subvolume.mountpoint)
+ installation_target = Path(installation.target)
mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
mountpoint.mkdir(parents=True, exist_ok=True)
-
- mount_options = subvolume_information.get('options', [])
- if not any('subvol=' in x for x in mount_options):
- mount_options += [f'subvol={name}']
+ mount_options = subvolume.options + [f'subvol={name}']
log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
-def setup_subvolumes(installation, partition_dict):
- """
- Taken from: ..user_guides.py
-
- partition['btrfs'] = {
- "subvolumes" : {
- "@": "/",
- "@home": "/home",
- "@log": "/var/log",
- "@pkg": "/var/cache/pacman/pkg",
- "@.snapshots": "/.snapshots"
- }
- }
- """
+def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
- for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
- # renormalize the right hand.
- # mountpoint = None
- subvol_options = []
-
- match right_hand:
- # case str(): # backwards-compatability
- # mountpoint = right_hand
- case dict():
- # mountpoint = right_hand.get('mountpoint', None)
- subvol_options = right_hand.get('options', [])
+ for subvolume in partition_dict['btrfs']['subvolumes']:
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = subvolume.name.lstrip('/')
# We create the subvolume using the BTRFSPartition instance.
# That way we ensure not only easy access, but also accurate mount locations etc.
@@ -76,27 +77,28 @@ def setup_subvolumes(installation, partition_dict):
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
+ if subvolume.nodatacow:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
+
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
+ if subvolume.compress:
if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
-def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
+ if 'fstab_btrfs_compression_plugin' not in plugins:
+ plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict)
+
+
+def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
try:
- subvolume_name = None
+ subvolume_name = ''
result = {}
for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
if index == 0:
@@ -110,14 +112,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
# allows for hooking in a pre-processor to do this we have to do it here:
result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
- return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
-
+ return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
except SysCallError as error:
log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
return None
-def find_parent_subvolume(path :pathlib.Path, filters=[]):
+
+def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
# A root path cannot have a parent
if str(path) == '/':
return None
@@ -127,6 +129,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]):
if found_mount['target'] == '/':
return None
- return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
+ return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
- return subvolume \ No newline at end of file
+ return subvolume
+
+ return None
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
index 5020133d..d04c9b98 100644
--- a/archinstall/lib/disk/btrfs/btrfspartition.py
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -15,24 +15,13 @@ from .btrfs_helpers import (
if TYPE_CHECKING:
from ...installer import Installer
- from .btrfssubvolume import BtrfsSubvolume
+ from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+
class BTRFSPartition(Partition):
def __init__(self, *args, **kwargs):
Partition.__init__(self, *args, **kwargs)
- def __repr__(self, *args :str, **kwargs :str) -> str:
- mount_repr = ''
- if self.mountpoint:
- mount_repr = f", mounted={self.mountpoint}"
- elif self.target_mountpoint:
- mount_repr = f", rel_mountpoint={self.target_mountpoint}"
-
- if self._encrypted:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
- else:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
-
@property
def subvolumes(self):
for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
@@ -40,17 +29,17 @@ class BTRFSPartition(Partition):
yield subvolume_info_from_path(filesystem['target'])
def iterate_children(struct):
- for child in struct.get('children', []):
+ for c in struct.get('children', []):
if '[' in child.get('source', ''):
- yield subvolume_info_from_path(child['target'])
+ yield subvolume_info_from_path(c['target'])
- for sub_child in iterate_children(child):
+ for sub_child in iterate_children(c):
yield sub_child
for child in iterate_children(filesystem):
yield child
- def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
+ def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo':
"""
Subvolumes have to be created within a mountpoint.
This means we need to get the current installation target.
@@ -62,13 +51,13 @@ class BTRFSPartition(Partition):
if not installation:
installation = storage.get('installation_session')
- # Determain if the path given, is an absolute path or a releative path.
+ # Determain if the path given, is an absolute path or a relative path.
# We do this by checking if the path contains a known mountpoint.
if str(subvolume)[0] == '/':
if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
# Path starts with a known mountpoint which isn't /
- # Which means it's an absolut path to a mounted location.
+ # Which means it's an absolute path to a mounted location.
pass
else:
# Since it's not an absolute position with a known start.
@@ -108,9 +97,13 @@ class BTRFSPartition(Partition):
if glob.glob(str(subvolume / '*')):
raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
- elif subvolinfo := subvolume_info_from_path(subvolume):
- raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+ # Ideally we would like to check if the destination is already a subvolume.
+ # But then we would need the mount-point at this stage as well.
+ # So we'll comment out this check:
+ # elif subvolinfo := subvolume_info_from_path(subvolume):
+ # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+ # And deal with it here:
SysCommand(f"btrfs subvolume create {subvolume}")
- return subvolume_info_from_path(subvolume) \ No newline at end of file
+ return subvolume_info_from_path(subvolume)
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
index a96e2a94..5f5bdea6 100644
--- a/archinstall/lib/disk/btrfs/btrfssubvolume.py
+++ b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
@@ -16,8 +16,9 @@ from ...general import SysCommand
from ...output import log
from ...storage import storage
+
@dataclass
-class BtrfsSubvolume:
+class BtrfsSubvolumeInfo:
full_path :pathlib.Path
name :str
uuid :str
@@ -68,9 +69,9 @@ class BtrfsSubvolume:
from .btrfs_helpers import subvolume_info_from_path
# TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
- # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
+ # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
# It would also be nice if it could use findmnt(self.full_path) and traverse backwards
- # finding the last occurance of a subvolume which 'self' belongs to.
+ # finding the last occurrence of a subvolume which 'self' belongs to.
if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
return self.full_path == volume.full_path
@@ -188,4 +189,4 @@ class BtrfsSubvolume:
def unmount(self, recurse :bool = True):
SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
- log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file
+ log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index f94b4b47..5d5952a0 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -74,7 +74,7 @@ class Filesystem:
raise KeyError(f"Could not create a GPT label on {self}")
elif self.mode == MBR:
if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MSDOS label on {self}")
+ raise KeyError(f"Could not create a MS-DOS label on {self}")
self.blockdevice.flush_cache()
time.sleep(3)
@@ -100,7 +100,7 @@ class Filesystem:
partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
except DiskError:
partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)
-
+
log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
else:
log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
@@ -210,7 +210,14 @@ class Filesystem:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition:
+ def add_partition(
+ self,
+ partition_type :str,
+ start :str,
+ end :str,
+ partition_format :Optional[str] = None,
+ skip_mklabel :bool = False
+ ) -> Partition:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
@@ -221,7 +228,7 @@ class Filesystem:
raise KeyError(f"Could not create a GPT label on {self}")
elif self.mode == MBR:
if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MSDOS label on {self}")
+ raise KeyError(f"Could not create a MS-DOS label on {self}")
self.blockdevice.flush_cache()
@@ -232,6 +239,7 @@ class Filesystem:
except DiskError:
pass
+ # TODO this check should probably run in the setup process rather than during the installation
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")
@@ -245,15 +253,9 @@ class Filesystem:
if self.parted(parted_string):
for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
- self.partprobe()
-
- new_partition_uuids = []
- for partition in self.blockdevice.partitions.values():
- try:
- new_partition_uuids.append(partition.part_uuid)
- except DiskError:
- pass
+ self.blockdevice.flush_cache()
+ new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
@@ -263,17 +265,23 @@ class Filesystem:
log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
- log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
+ log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
raise err
else:
log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
- time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
+ self.partprobe()
+ time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
+ else:
+ print("Parted did not return True during partition creation")
+
+ total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
+ total_partitions.update(previous_partuuids)
# TODO: This should never be able to happen
log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
+ log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")
raise DiskError(f"Could not add partition using: {parted_string}")
def set_name(self, partition: int, name: str) -> bool:
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 99856aad..f19125f4 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -8,6 +8,8 @@ import time
import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
+from ..models.subvolume import Subvolume
+
if TYPE_CHECKING:
from .partition import Partition
@@ -112,7 +114,7 @@ def cleanup_bash_escapes(data :str) -> str:
def blkid(cmd :str) -> Dict[str, Any]:
if '-o' in cmd and '-o export' not in cmd:
- raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.")
elif '-o' not in cmd:
cmd += ' -o export'
@@ -133,7 +135,7 @@ def blkid(cmd :str) -> Dict[str, Any]:
key, val = line.split('=', 1)
if key.lower() == 'devname':
devname = val
- # Lowercase for backwards compatability with all_disks() previous use cases
+ # Lowercase for backwards compatibility with all_disks() previous use cases
result[devname] = {
"path": devname,
"PATH": devname
@@ -218,7 +220,12 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
# we'll iterate the /sys/class definitions and find the information
# from there.
for block_device in glob.glob("/sys/class/block/*"):
- device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
+
+ if device_path.exists() is False:
+ log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
+ continue
+
try:
information = blkid(f'blkid -p -o export {device_path}')
except SysCallError as ex:
@@ -227,12 +234,17 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
try:
information = get_loop_info(device_path)
if not information:
+ print("Exit code for blkid -p -o export was:", ex.exit_code)
raise SysCallError("Could not get loop information", exit_code=1)
except SysCallError:
+ print("Not a loop device, trying uevent rules.")
information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
else:
+ # We could not reliably get any information, perhaps the disk is clean of information?
+ print("Raising ex because:", ex.exit_code)
raise ex
+ # return instances
information = enrich_blockdevice_information(information)
@@ -244,7 +256,7 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path))))
elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
instances[path] = BlockDevice(path, path_info)
- elif path_info.get('TYPE') == 'squashfs':
+ elif path_info.get('TYPE') in ('squashfs', 'erofs'):
# We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
continue
else:
@@ -368,7 +380,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict
return filters
-def get_partitions_in_use(mountpoint :str) -> List[Partition]:
+def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
from .partition import Partition
try:
@@ -391,12 +403,20 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]:
if not type(blockdev) in (Partition, MapperDev):
continue
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
+ if isinstance(blockdev, Partition):
+ for blockdev_mountpoint in blockdev.mountpoints:
+ block_devices_mountpoints[blockdev_mountpoint] = blockdev
+ else:
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ # Since all_blockdevices() returns PosixPath objects, we need to convert
+ # findmnt paths to pathlib.Path() first:
+ mountpoint = pathlib.Path(mountpoint)
+
if mountpoint in block_devices_mountpoints:
if mountpoint not in mounts:
mounts[mountpoint] = block_devices_mountpoints[mountpoint]
@@ -433,9 +453,10 @@ def disk_layouts() -> Optional[Dict[str, Any]]:
def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool:
- for partition in blockdevices.values():
- if partition.get('encrypted', False):
- yield partition
+ for blockdevice in blockdevices.values():
+ for partition in blockdevice.get('partitions', []):
+ if partition.get('encrypted', False):
+ yield partition
def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
for device in block_devices:
@@ -468,13 +489,14 @@ def convert_device_to_uuid(path :str) -> str:
raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")
+
def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool:
""" Determine if a certain partition is mounted (or has a mountpoint) as specific target (path)
Coded for clarity rather than performance
Input parms:
:parm partition the partition we check
- :type Either a Partition object or a dict with the contents of a partition definiton in the disk_layouts schema
+ :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema
:parm target (a string representing a mount path we want to check for.
:type str
@@ -484,10 +506,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri
"""
# we create the mountpoint list
if isinstance(partition,dict):
- subvols = partition.get('btrfs',{}).get('subvolumes',{})
- mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols]
+ subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', [])
+ mountpoints = [partition.get('mountpoint')]
+ mountpoints += [volume.mountpoint for volume in subvolumes]
else:
mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes]
+
# we check
if strict or target == '/':
if target in mountpoints:
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
index 913dbc13..71ef2a79 100644
--- a/archinstall/lib/disk/mapperdev.py
+++ b/archinstall/lib/disk/mapperdev.py
@@ -10,7 +10,7 @@ from ..general import SysCommand
from ..output import log
if TYPE_CHECKING:
- from .btrfs import BtrfsSubvolume
+ from .btrfs import BtrfsSubvolumeInfo
@dataclass
class MapperDev:
@@ -37,12 +37,12 @@ class MapperDev:
for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
-
+
try:
uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
except SysCallError as error:
log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
-
+
information = uevent(uevent_data)
block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
@@ -65,9 +65,13 @@ class MapperDev:
return None
@property
+ def mountpoints(self) -> List[Dict[str, Any]]:
+ return [obj['target'] for obj in self.mount_information]
+
+ @property
def mount_information(self) -> List[Dict[str, Any]]:
from .helpers import find_mountpoint
- return list(find_mountpoint(self.path))
+ return [{**obj, 'target' : pathlib.Path(obj.get('target', '/dev/null'))} for obj in find_mountpoint(self.path)]
@property
def filesystem(self) -> Optional[str]:
@@ -75,10 +79,10 @@ class MapperDev:
return get_filesystem_type(self.path)
@property
- def subvolumes(self) -> Iterator['BtrfsSubvolume']:
+ def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']:
from .btrfs import subvolume_info_from_path
for mountpoint in self.mount_information:
if target := mountpoint.get('target'):
if subvolume := subvolume_info_from_path(pathlib.Path(target)):
- yield subvolume \ No newline at end of file
+ yield subvolume
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 73c88597..56a7d436 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -1,60 +1,83 @@
import glob
-import pathlib
import time
import logging
import json
import os
import hashlib
+import typing
+from dataclasses import dataclass
+from pathlib import Path
from typing import Optional, Dict, Any, List, Union, Iterator
from .blockdevice import BlockDevice
-from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name
+from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
from .btrfs.btrfs_helpers import subvolume_info_from_path
-from .btrfs.btrfssubvolume import BtrfsSubvolume
+from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
+
+
+@dataclass
+class PartitionInfo:
+ pttype: str
+ partuuid: str
+ uuid: str
+ start: Optional[int]
+ end: Optional[int]
+ bootable: bool
+ size: float
+ sector_size: int
+ filesystem_type: str
+ mountpoints: List[Path]
+
+ def get_first_mountpoint(self) -> Optional[Path]:
+ if len(self.mountpoints) > 0:
+ return self.mountpoints[0]
+ return None
+
class Partition:
- def __init__(self,
+ def __init__(
+ self,
path: str,
block_device: BlockDevice,
part_id :Optional[str] = None,
filesystem :Optional[str] = None,
mountpoint :Optional[str] = None,
encrypted :bool = False,
- autodetect_filesystem :bool = True):
-
+ autodetect_filesystem :bool = True,
+ ):
if not part_id:
part_id = os.path.basename(path)
- self.block_device = block_device
- if type(self.block_device) is str:
+ if type(block_device) is str:
raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
- self.path = path
- self.part_id = part_id
- self.target_mountpoint = mountpoint
- self.filesystem = filesystem
+ self.block_device = block_device
+ self._path = path
+ self._part_id = part_id
+ self._target_mountpoint = mountpoint
self._encrypted = None
- self.encrypted = encrypted
- self.allow_formatting = False
+ self._encrypted = encrypted
+ self._wipe = False
+ self._type = 'primary'
if mountpoint:
self.mount(mountpoint)
- try:
- self.mount_information = list(find_mountpoint(self.path))
- except DiskError:
- self.mount_information = [{}]
+ self._partition_info = self._fetch_information()
- if not self.filesystem and autodetect_filesystem:
- self.filesystem = get_filesystem_type(path)
+ if not autodetect_filesystem and filesystem:
+ self._partition_info.filesystem_type = filesystem
- if self.filesystem == 'crypto_LUKS':
- self.encrypted = True
+ if self._partition_info.filesystem_type == 'crypto_LUKS':
+ self._encrypted = True
+ # I hate doint this but I'm currently unsure where this
+ # is acutally used to be able to fix the typing issues properly
+ @typing.no_type_check
def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
@@ -62,147 +85,175 @@ class Partition:
left_comparitor = str(left_comparitor)
# The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
- return self.path < left_comparitor
+ return self._path < left_comparitor
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
- if self.mountpoint:
- mount_repr = f", mounted={self.mountpoint}"
- elif self.target_mountpoint:
- mount_repr = f", rel_mountpoint={self.target_mountpoint}"
+ if mountpoint := self._partition_info.get_first_mountpoint():
+ mount_repr = f", mounted={mountpoint}"
+ elif self._target_mountpoint:
+ mount_repr = f", rel_mountpoint={self._target_mountpoint}"
+
+ classname = self.__class__.__name__
if self._encrypted:
- return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
+ return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
else:
- return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
+ return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
+
+ def as_json(self) -> Dict[str, Any]:
+ """
+ this is used for the table representation of the partition (see FormattedOutput)
+ """
+ partition_info = {
+ 'type': self._type,
+ 'PARTUUID': self.part_uuid,
+ 'wipe': self._wipe,
+ 'boot': self.boot,
+ 'ESP': self.boot,
+ 'mountpoint': self._target_mountpoint,
+ 'encrypted': self._encrypted,
+ 'start': self.start,
+ 'size': self.end,
+ 'filesystem': self._partition_info.filesystem_type
+ }
+
+ return partition_info
def __dump__(self) -> Dict[str, Any]:
+ # TODO remove this in favour of as_json
return {
- 'type': 'primary',
- 'PARTUUID': self._safe_uuid,
- 'wipe': self.allow_formatting,
+ 'type': self._type,
+ 'PARTUUID': self.part_uuid,
+ 'wipe': self._wipe,
'boot': self.boot,
'ESP': self.boot,
- 'mountpoint': self.target_mountpoint,
+ 'mountpoint': self._target_mountpoint,
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
'filesystem': {
- 'format': get_filesystem_type(self.path)
+ 'format': self._partition_info.filesystem_type
}
}
- @property
- def mountpoint(self) -> Optional[str]:
- try:
- data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
- for filesystem in data['filesystems']:
- return pathlib.Path(filesystem.get('target'))
+ def _call_lsblk(self) -> Dict[str, Any]:
+ self.partprobe()
+ # This sleep might be overkill, but lsblk is known to
+ # work against a chaotic cache that can change during call
+ # causing no information to be returned (blkid is better)
+ # time.sleep(1)
+ # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
+
+ try:
+ output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
except SysCallError as error:
- # Not mounted anywhere most likely
- log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey")
- pass
+ # It appears as if lsblk can return exit codes like 8192 to indicate something.
+ # But it does return output so we'll try to catch it.
+ output = error.worker.decode('UTF-8')
- return None
+ if output:
+ lsblk_info = json.loads(output)
+ return lsblk_info
- @property
- def sector_size(self) -> Optional[int]:
- output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
+ raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk')
- for device in output['blockdevices']:
- return device.get('log-sec', None)
+ def _call_sfdisk(self) -> Dict[str, Any]:
+ output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
- @property
- def start(self) -> Optional[str]:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ if output:
+ sfdisk_info = json.loads(output)
+ partitions = sfdisk_info.get('partitiontable', {}).get('partitions', [])
+ node = list(filter(lambda x: x['node'] == self._path, partitions))
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['start'] # * self.sector_size
+ if len(node) > 0:
+ return node[0]
- @property
- def end(self) -> Optional[str]:
- # TODO: actually this is size in sectors unit
- # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ return {}
+
+ raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk')
+
+ def _fetch_information(self) -> PartitionInfo:
+ lsblk_info = self._call_lsblk()
+ sfdisk_info = self._call_sfdisk()
+
+ if not (device := lsblk_info.get('blockdevices', [None])[0]):
+ raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['size'] # * self.sector_size
+ mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
+ bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
+
+ return PartitionInfo(
+ pttype=device['pttype'],
+ partuuid=device['partuuid'],
+ uuid=device['uuid'],
+ sector_size=device['log-sec'],
+ size=convert_size_to_gb(device['size']),
+ start=sfdisk_info.get('start', None),
+ end=sfdisk_info.get('size', None),
+ bootable=bootable,
+ filesystem_type=device['fstype'],
+ mountpoints=mountpoints
+ )
@property
- def end_sectors(self) -> Optional[str]:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ def target_mountpoint(self) -> Optional[str]:
+ return self._target_mountpoint
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['start'] + partition['size']
+ @property
+ def path(self) -> str:
+ return self._path
@property
- def size(self) -> Optional[float]:
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- self.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
+ def filesystem(self) -> str:
+ return self._partition_info.filesystem_type
- try:
- lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode())
+ @property
+ def mountpoint(self) -> Optional[Path]:
+ if len(self.mountpoints) > 0:
+ return self.mountpoints[0]
+ return None
- for device in lsblk['blockdevices']:
- return convert_size_to_gb(device['size'])
- except SysCallError as error:
- if error.exit_code == 8192:
- return None
- else:
- raise error
+ @property
+ def mountpoints(self) -> List[Path]:
+ return self._partition_info.mountpoints
@property
- def boot(self) -> bool:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
-
- # Get the bootable flag from the sfdisk output:
- # {
- # "partitiontable": {
- # "device":"/dev/loop0",
- # "partitions": [
- # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true}
- # ]
- # }
- # }
-
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- # first condition is for MBR disks, second for GPT disks
- return partition.get('bootable', False) or partition.get('type','') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
+ def sector_size(self) -> int:
+ return self._partition_info.sector_size
- return False
+ @property
+ def start(self) -> Optional[int]:
+ return self._partition_info.start
@property
- def partition_type(self) -> Optional[str]:
- lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
+ def end(self) -> Optional[int]:
+ return self._partition_info.end
- for device in lsblk['blockdevices']:
- return device['pttype']
+ @property
+ def end_sectors(self) -> Optional[int]:
+ start = self._partition_info.start
+ end = self._partition_info.end
+ if start and end:
+ return start + end
+ return None
@property
- def part_uuid(self) -> Optional[str]:
- """
- Returns the PARTUUID as returned by lsblk.
- This is more reliable than relying on /dev/disk/by-partuuid as
- it doesn't seam to be able to detect md raid partitions.
- For bind mounts all the subvolumes share the same uuid
- """
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- if not self.partprobe():
- raise DiskError(f"Could not perform partprobe on {self.device_path}")
-
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
+ def size(self) -> Optional[float]:
+ return self._partition_info.size
- partuuid = self._safe_part_uuid
- if partuuid:
- return partuuid
+ @property
+ def boot(self) -> bool:
+ return self._partition_info.bootable
- raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
+ @property
+ def partition_type(self) -> Optional[str]:
+ return self._partition_info.pttype
+
+ @property
+ def part_uuid(self) -> str:
+ return self._partition_info.partuuid
@property
def uuid(self) -> Optional[str]:
@@ -232,7 +283,7 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
if not self.partprobe():
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
return None
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
@@ -240,7 +291,7 @@ class Partition:
try:
return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
except SysCallError as error:
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
return None
@@ -254,7 +305,7 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
if not self.partprobe():
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
return None
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
@@ -262,37 +313,39 @@ class Partition:
try:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
except SysCallError as error:
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
return None
log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
+ return self._partition_info.uuid
+
@property
def encrypted(self) -> Union[bool, None]:
return self._encrypted
- @encrypted.setter
- def encrypted(self, value: bool) -> None:
- self._encrypted = value
-
@property
def parent(self) -> str:
return self.real_device
@property
def real_device(self) -> str:
- for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
- if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
- return f"/dev/{parent}"
- # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
- return self.path
+ output = SysCommand('lsblk -J').decode('UTF-8')
+
+ if output:
+ for blockdevice in json.loads(output)['blockdevices']:
+ if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
+ return f"/dev/{parent}"
+ return self._path
+
+ raise DiskError('Unable to get disk information for command "lsblk -J"')
@property
def device_path(self) -> str:
- """ for bind mounts returns the phisical path of the partition
+ """ for bind mounts returns the physical path of the partition
"""
- device_path, bind_name = split_bind_name(self.path)
+ device_path, bind_name = split_bind_name(self._path)
return device_path
@property
@@ -300,38 +353,40 @@ class Partition:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
- device_path, bind_name = split_bind_name(self.path)
+ device_path, bind_name = split_bind_name(self._path)
return bind_name
@property
- def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]:
from .helpers import findmnt
-
+
def iterate_children_recursively(information):
for child in information.get('children', []):
if target := child.get('target'):
- if subvolume := subvolume_info_from_path(pathlib.Path(target)):
- yield subvolume
+ if child.get('fstype') == 'btrfs':
+ if subvolume := subvolume_info_from_path(Path(target)):
+ yield subvolume
if child.get('children'):
for subchild in iterate_children_recursively(child):
yield subchild
- for mountpoint in self.mount_information:
- if result := findmnt(pathlib.Path(mountpoint['target'])):
- for filesystem in result.get('filesystems', []):
- if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])):
- yield subvolume
+ if self._partition_info.filesystem_type == 'btrfs':
+ for mountpoint in self._partition_info.mountpoints:
+ if result := findmnt(mountpoint):
+ for filesystem in result.get('filesystems', []):
+ if subvolume := subvolume_info_from_path(mountpoint):
+ yield subvolume
- for child in iterate_children_recursively(filesystem):
- yield child
+ for child in iterate_children_recursively(filesystem):
+ yield child
def partprobe(self) -> bool:
try:
if self.block_device:
return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code
except SysCallError as error:
- log(f"Unreliable results might be given for {self.path} due to partprobe error: {error}", level=logging.DEBUG)
+ log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG)
return False
@@ -343,19 +398,20 @@ class Partition:
with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device:
return unlocked_device.filesystem
except SysCallError:
- return None
+ pass
+ return None
def has_content(self) -> bool:
- fs_type = get_filesystem_type(self.path)
+ fs_type = self._partition_info.filesystem_type
if not fs_type or "swap" in fs_type:
return False
temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest()
- temporary_path = pathlib.Path(temporary_mountpoint)
+ temporary_path = Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True)
- if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
- raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
+ if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0:
+ raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}')
files = len(glob.glob(f"{temporary_mountpoint}/*"))
iterations = 0
@@ -366,14 +422,14 @@ class Partition:
return True if files > 0 else False
- def encrypt(self, *args :str, **kwargs :str) -> str:
+ def encrypt(self, password: Optional[str] = None) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
from ..luks import luks2
handle = luks2(self, None, None)
- return handle.encrypt(self, *args, **kwargs)
+ return handle.encrypt(self, password=password)
def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
"""
@@ -381,17 +437,17 @@ class Partition:
the formatting functionality and in essence the support for the given filesystem.
"""
if filesystem is None:
- filesystem = self.filesystem
+ filesystem = self._partition_info.filesystem_type
if path is None:
- path = self.path
+ path = self._path
# This converts from fat32 -> vfat to unify filesystem names
filesystem = get_mount_fs_type(filesystem)
# To avoid "unable to open /dev/x: No such file or directory"
start_wait = time.time()
- while pathlib.Path(path).exists() is False and time.time() - start_wait < 10:
+ while Path(path).exists() is False and time.time() - start_wait < 10:
time.sleep(0.025)
if log_formatting:
@@ -401,57 +457,57 @@ class Partition:
if filesystem == 'btrfs':
options = ['-f'] + options
- if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
+ mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')
+ if mkfs and 'UUID:' not in mkfs:
raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'vfat':
options = ['-F32'] + options
-
+ log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")
if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext4':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext2':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'ext2'
-
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self._partition_info.filesystem_type = 'ext2'
elif filesystem == 'xfs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'f2fs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ntfs3':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'crypto_LUKS':
# from ..luks import luks2
# encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path)
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
@@ -460,13 +516,13 @@ class Partition:
if retry is True:
log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
time.sleep(storage.get('DISK_TIMEOUTS', 1))
-
+
return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
- self.encrypted = True
+ self._encrypted = True
else:
- self.encrypted = False
+ self._encrypted = False
return True
@@ -478,18 +534,18 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
+ return None
+
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
- if not self.mountpoint:
+ if not self._partition_info.get_first_mountpoint():
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
- if not self.filesystem:
- raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
- fs = self.filesystem
+ fs = self._partition_info.filesystem_type
fs_type = get_mount_fs_type(fs)
- pathlib.Path(target).mkdir(parents=True, exist_ok=True)
+ Path(target).mkdir(parents=True, exist_ok=True)
if self.bind_name:
device_path = self.device_path
@@ -499,7 +555,7 @@ class Partition:
else:
options = f"subvol={self.bind_name}"
else:
- device_path = self.path
+ device_path = self._path
try:
if options:
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}")
@@ -508,7 +564,7 @@ class Partition:
# TODO: Should be redundant to check for exit_code
if mnt_handle.exit_code != 0:
- raise DiskError(f"Could not mount {self.path} to {target} using options {options}")
+ raise DiskError(f"Could not mount {self._path} to {target} using options {options}")
except SysCallError as err:
raise err
@@ -517,19 +573,17 @@ class Partition:
return False
def unmount(self) -> bool:
- worker = SysCommand(f"/usr/bin/umount {self.path}")
+ worker = SysCommand(f"/usr/bin/umount {self._path}")
+ exit_code = worker.exit_code
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
- if 0 < worker.exit_code < 8000:
- raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
+ if exit_code and 0 < exit_code < 8000:
+ raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
return True
- def umount(self) -> bool:
- return self.unmount()
-
def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
@@ -538,7 +592,7 @@ class Partition:
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
"""
try:
- self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
+ self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False)
except (SysCallError, DiskError):
pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err:
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 5fa6bfdc..5809c073 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -3,6 +3,8 @@ import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
+from ..models.subvolume import Subvolume
+
if TYPE_CHECKING:
from .blockdevice import BlockDevice
_: Any
@@ -107,17 +109,14 @@ def suggest_single_disk_layout(block_device :BlockDevice,
# https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash
# https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh
layout[block_device.path]['partitions'][1]['btrfs'] = {
- "subvolumes" : {
- "@":"/",
- "@home": "/home",
- "@log": "/var/log",
- "@pkg": "/var/cache/pacman/pkg",
- "@.snapshots": "/.snapshots"
- }
+ 'subvolumes': [
+ Subvolume('@', '/'),
+ Subvolume('@home', '/home'),
+ Subvolume('@log', '/var/log'),
+ Subvolume('@pkg', '/var/cache/pacman/pkg'),
+ Subvolume('@.snapshots', '/.snapshots')
+ ]
}
- # else:
- # pass # ... implement a guided setup
-
elif using_home_partition:
# If we don't want to use subvolumes,
# But we want to be able to re-use data between re-installs..
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
index fd1b7f33..54808886 100644
--- a/archinstall/lib/disk/validators.py
+++ b/archinstall/lib/disk/validators.py
@@ -7,13 +7,11 @@ def valid_parted_position(pos :str) -> bool:
if pos.isdigit():
return True
- if pos[-1] == '%' and pos[:-1].isdigit():
+ if pos.lower().endswith('b') and pos[:-1].isdigit():
return True
- if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].replace(".", "", 1).isdigit():
- return True
-
- if pos[-2:].lower() in ['kb', 'mb', 'gb', 'tb'] and pos[:-2].replace(".", "", 1).isdigit():
+ if any(pos.lower().endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit()
+ for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']):
return True
return False