Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Girtler <blackrabbit256@gmail.com>2022-06-07 01:28:46 +1000
committerGitHub <noreply@github.com>2022-06-06 17:28:46 +0200
commita7ca037a26de53fd242f89bc6a90fd53337b4d13 (patch)
tree5919ce42f32a7dac45b543ac15835a11086bd41b
parent2d4b2620462a0fb4c9496ed0629d7ab8930fc73a (diff)
Update the subvolume menu - fix for #1278 (#1297)
* Update subvolume * Add mypy compliance Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com> Co-authored-by: Anton Hvornum <anton@hvornum.se>
-rw-r--r--.github/workflows/mypy.yaml2
-rw-r--r--archinstall/__init__.py6
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py130
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py94
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py6
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py (renamed from archinstall/lib/disk/btrfs/btrfssubvolume.py)5
-rw-r--r--archinstall/lib/disk/helpers.py9
-rw-r--r--archinstall/lib/disk/mapperdev.py10
-rw-r--r--archinstall/lib/disk/partition.py10
-rw-r--r--archinstall/lib/disk/user_guides.py19
-rw-r--r--archinstall/lib/installer.py55
-rw-r--r--archinstall/lib/menu/global_menu.py33
-rw-r--r--archinstall/lib/menu/list_manager.py39
-rw-r--r--archinstall/lib/models/subvolume.py68
-rw-r--r--archinstall/lib/models/users.py6
-rw-r--r--archinstall/lib/user_interaction/partitioning_conf.py12
-rw-r--r--archinstall/lib/user_interaction/subvolume_config.py201
17 files changed, 273 insertions, 432 deletions
diff --git a/.github/workflows/mypy.yaml b/.github/workflows/mypy.yaml
index 8463afda..b0901b38 100644
--- a/.github/workflows/mypy.yaml
+++ b/.github/workflows/mypy.yaml
@@ -15,4 +15,4 @@ jobs:
# one day this will be enabled
# run: mypy --strict --module archinstall || exit 0
- name: run mypy
- run: mypy --follow-imports=silent archinstall/lib/menu/selection_menu.py archinstall/lib/menu/global_menu.py archinstall/lib/models/network_configuration.py archinstall/lib/menu/list_manager.py archinstall/lib/user_interaction/network_conf.py archinstall/lib/models/users.py archinstall/lib/translation.py
+ run: mypy --follow-imports=silent archinstall/lib/menu/selection_menu.py archinstall/lib/menu/global_menu.py archinstall/lib/models/network_configuration.py archinstall/lib/menu/list_manager.py archinstall/lib/user_interaction/network_conf.py archinstall/lib/models/users.py archinstall/lib/user_interaction/subvolume_config.py archinstall/lib/disk/btrfs/btrfs_helpers.py archinstall/lib/translation.py
diff --git a/archinstall/__init__.py b/archinstall/__init__.py
index abcad3ba..1a360c67 100644
--- a/archinstall/__init__.py
+++ b/archinstall/__init__.py
@@ -226,8 +226,6 @@ def post_process_arguments(arguments):
load_plugin(arguments['plugin'])
if arguments.get('disk_layouts', None) is not None:
- # if 'disk_layouts' not in storage:
- # storage['disk_layouts'] = {}
layout_storage = {}
if not json_stream_to_structure('--disk_layouts',arguments['disk_layouts'],layout_storage):
exit(1)
@@ -236,10 +234,12 @@ def post_process_arguments(arguments):
arguments['harddrives'] = [disk for disk in layout_storage]
# backward compatibility. Change partition.format for partition.wipe
for disk in layout_storage:
- for i,partition in enumerate(layout_storage[disk].get('partitions',[])):
+ for i, partition in enumerate(layout_storage[disk].get('partitions',[])):
if 'format' in partition:
partition['wipe'] = partition['format']
del partition['format']
+ elif 'btrfs' in partition:
+ partition['btrfs']['subvolumes'] = Subvolume.parse_arguments(partition['btrfs']['subvolumes'])
arguments['disk_layouts'] = layout_storage
load_config()
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py
index 90c58145..3c183112 100644
--- a/archinstall/lib/disk/btrfs/__init__.py
+++ b/archinstall/lib/disk/btrfs/__init__.py
@@ -2,8 +2,7 @@ from __future__ import annotations
import pathlib
import glob
import logging
-import re
-from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+from typing import Union, Dict, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -15,30 +14,15 @@ from .btrfs_helpers import (
setup_subvolumes as setup_subvolumes,
mount_subvolume as mount_subvolume
)
-from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume
from .btrfspartition import BTRFSPartition as BTRFSPartition
-from ..helpers import get_mount_info
from ...exceptions import DiskError, Deprecated
from ...general import SysCommand
from ...output import log
-from ...exceptions import SysCallError
-def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
- try:
- output = SysCommand(f"btrfs subvol show {path}").decode()
- except SysCallError as error:
- print('Error:', error)
- result = {}
- for line in output.replace('\r\n', '\n').split('\n'):
- if ':' in line:
- key, val = line.replace('\t', '').split(':', 1)
- result[key.strip().lower().replace(' ', '_')] = val.strip()
-
- return result
-
-def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
+def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -71,112 +55,6 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
-def _has_option(option :str,options :list) -> bool:
- """ auxiliary routine to check if an option is present in a list.
- we check if the string appears in one of the options, 'cause it can appear in several forms (option, option=val,...)
- """
- if not options:
- return False
-
- for item in options:
- if option in item:
- return True
-
- return False
-
-def manage_btrfs_subvolumes(installation :Installer,
- partition :Dict[str, str],) -> list:
+def manage_btrfs_subvolumes(installation :Installer, partition :Dict[str, str]) -> list:
raise Deprecated("Use setup_subvolumes() instead.")
-
- from copy import deepcopy
- """ we do the magic with subvolumes in a centralized place
- parameters:
- * the installation object
- * the partition dictionary entry which represents the physical partition
- returns
- * mountpoinst, the list which contains all the "new" partititon to be mounted
-
- We expect the partition has been mounted as / , and it to be unmounted after the processing
- Then we create all the subvolumes inside btrfs as demand
- We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
- Then we return a list of "new" partitions to be processed as "normal" partitions
- # TODO For encrypted devices we need some special processing prior to it
- """
- # We process each of the pairs <subvolume name: mount point | None | mount info dict>
- # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
- # of mount options (or similar used by brtfs)
- mountpoints = []
- subvolumes = partition['btrfs']['subvolumes']
- for name, right_hand in subvolumes.items():
- try:
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
- if name.startswith('/'):
- name = name[1:]
- # renormalize the right hand.
- location = None
- subvol_options = []
- # no contents, so it is not to be mounted
- if not right_hand:
- location = None
- # just a string. per backward compatibility the mount point
- elif isinstance(right_hand,str):
- location = right_hand
- # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
- elif isinstance(right_hand,dict):
- location = right_hand.get('mountpoint',None)
- subvol_options = right_hand.get('options',[])
- # we create the subvolume
- create_subvolume(installation,name)
- # Make the nodatacow processing now
- # It will be the main cause of creation of subvolumes which are not to be mounted
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
- if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
- # Make the compress processing now
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- # in this way only zstd compression is activaded
- # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
- if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])):
- if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
- # END compress processing.
- # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
- if not partition['mountpoint'] and location is not None:
- # we begin to create a fake partition entry. First we copy the original -the one that corresponds to
- # the primary partition. We make a deepcopy to avoid altering the original content in any case
- fake_partition = deepcopy(partition)
- # we start to modify entries in the "fake partition" to match the needs of the subvolumes
- # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
- del fake_partition['btrfs']
- fake_partition['encrypted'] = False
- fake_partition['generate-encryption-key-file'] = False
- # Mount destination. As of now the right hand part
- fake_partition['mountpoint'] = location
- # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
- fake_partition['subvolume'] = name
- # here we add the special mount options for the subvolume, if any.
- # if the original partition['options'] is not a list might give trouble
- if fake_partition.get('filesystem',{}).get('mount_options',[]):
- fake_partition['filesystem']['mount_options'].extend(subvol_options)
- else:
- fake_partition['filesystem']['mount_options'] = subvol_options
- # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
- # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
- # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
- fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
-
- # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
- # as "normal" ones
- mountpoints.append(fake_partition)
- except Exception as e:
- raise e
- return mountpoints
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
index 5fa94314..ab528388 100644
--- a/archinstall/lib/disk/btrfs/btrfs_helpers.py
+++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py
@@ -1,72 +1,42 @@
-import pathlib
import logging
-from typing import Optional
+from pathlib import Path
+from typing import Optional, Dict, Any, TYPE_CHECKING
+from ...models.subvolume import Subvolume
from ...exceptions import SysCallError, DiskError
from ...general import SysCommand
from ...output import log
from ..helpers import get_mount_info
-from .btrfssubvolume import BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+if TYPE_CHECKING:
+ from .btrfspartition import BTRFSPartition
+ from ...installer import Installer
-def mount_subvolume(installation, device, name, subvolume_information):
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
-
- # renormalize the right hand.
- mountpoint = subvolume_information.get('mountpoint', None)
- if not mountpoint:
- return None
-
- if type(mountpoint) == str:
- mountpoint = pathlib.Path(mountpoint)
- installation_target = installation.target
- if type(installation_target) == str:
- installation_target = pathlib.Path(installation_target)
+def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
+ # we normalize the subvolume name (getting rid of slash at the start if exists.
+ # In our implementation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = subvolume.name.lstrip('/')
+ mountpoint = Path(subvolume.mountpoint)
+ installation_target = Path(installation.target)
mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
mountpoint.mkdir(parents=True, exist_ok=True)
-
- mount_options = subvolume_information.get('options', [])
- if not any('subvol=' in x for x in mount_options):
- mount_options += [f'subvol={name}']
+ mount_options = subvolume.options + [f'subvol={name}']
log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
-def setup_subvolumes(installation, partition_dict):
- """
- Taken from: ..user_guides.py
-
- partition['btrfs'] = {
- "subvolumes" : {
- "@": "/",
- "@home": "/home",
- "@log": "/var/log",
- "@pkg": "/var/cache/pacman/pkg",
- "@.snapshots": "/.snapshots"
- }
- }
- """
+def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
- for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
+
+ for subvolume in partition_dict['btrfs']['subvolumes']:
# we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
# Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
-
- # renormalize the right hand.
- # mountpoint = None
- subvol_options = []
-
- match right_hand:
- # case str(): # backwards-compatability
- # mountpoint = right_hand
- case dict():
- # mountpoint = right_hand.get('mountpoint', None)
- subvol_options = right_hand.get('options', [])
+ name = subvolume.name.lstrip('/')
# We create the subvolume using the BTRFSPartition instance.
# That way we ensure not only easy access, but also accurate mount locations etc.
@@ -76,27 +46,25 @@ def setup_subvolumes(installation, partition_dict):
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
+ if subvolume.nodatacow:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
+
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
+ if subvolume.compress:
if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
-def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
+
+def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
try:
- subvolume_name = None
+ subvolume_name = ''
result = {}
for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
if index == 0:
@@ -110,14 +78,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
# allows for hooking in a pre-processor to do this we have to do it here:
result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
- return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
-
+ return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
except SysCallError as error:
log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
return None
-def find_parent_subvolume(path :pathlib.Path, filters=[]):
+
+def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
# A root path cannot have a parent
if str(path) == '/':
return None
@@ -127,6 +95,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]):
if found_mount['target'] == '/':
return None
- return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
+ return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
+
+ return subvolume
- return subvolume \ No newline at end of file
+ return None
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
index 6f7487e4..a05f1527 100644
--- a/archinstall/lib/disk/btrfs/btrfspartition.py
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -15,7 +15,7 @@ from .btrfs_helpers import (
if TYPE_CHECKING:
from ...installer import Installer
- from .btrfssubvolume import BtrfsSubvolume
+ from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
class BTRFSPartition(Partition):
def __init__(self, *args, **kwargs):
@@ -50,7 +50,7 @@ class BTRFSPartition(Partition):
for child in iterate_children(filesystem):
yield child
- def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
+ def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo':
"""
Subvolumes have to be created within a mountpoint.
This means we need to get the current installation target.
@@ -117,4 +117,4 @@ class BTRFSPartition(Partition):
# And deal with it here:
SysCommand(f"btrfs subvolume create {subvolume}")
- return subvolume_info_from_path(subvolume) \ No newline at end of file
+ return subvolume_info_from_path(subvolume)
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
index bc7db612..5f5bdea6 100644
--- a/archinstall/lib/disk/btrfs/btrfssubvolume.py
+++ b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
@@ -16,8 +16,9 @@ from ...general import SysCommand
from ...output import log
from ...storage import storage
+
@dataclass
-class BtrfsSubvolume:
+class BtrfsSubvolumeInfo:
full_path :pathlib.Path
name :str
uuid :str
@@ -188,4 +189,4 @@ class BtrfsSubvolume:
def unmount(self, recurse :bool = True):
SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
- log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file
+ log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 85c0390f..660594ed 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -8,6 +8,8 @@ import time
import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
+from ..models.subvolume import Subvolume
+
if TYPE_CHECKING:
from .partition import Partition
@@ -469,6 +471,7 @@ def convert_device_to_uuid(path :str) -> str:
raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")
+
def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool:
""" Determine if a certain partition is mounted (or has a mountpoint) as specific target (path)
Coded for clarity rather than performance
@@ -485,10 +488,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri
"""
# we create the mountpoint list
if isinstance(partition,dict):
- subvols = partition.get('btrfs',{}).get('subvolumes',{})
- mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols]
+ subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', [])
+ mountpoints = [partition.get('mountpoint')]
+ mountpoints += [volume.mountpoint for volume in subvolumes]
else:
mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes]
+
# we check
if strict or target == '/':
if target in mountpoints:
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
index 913dbc13..49137ae9 100644
--- a/archinstall/lib/disk/mapperdev.py
+++ b/archinstall/lib/disk/mapperdev.py
@@ -10,7 +10,7 @@ from ..general import SysCommand
from ..output import log
if TYPE_CHECKING:
- from .btrfs import BtrfsSubvolume
+ from .btrfs import BtrfsSubvolumeInfo
@dataclass
class MapperDev:
@@ -37,12 +37,12 @@ class MapperDev:
for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
-
+
try:
uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
except SysCallError as error:
log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
-
+
information = uevent(uevent_data)
block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
@@ -75,10 +75,10 @@ class MapperDev:
return get_filesystem_type(self.path)
@property
- def subvolumes(self) -> Iterator['BtrfsSubvolume']:
+ def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']:
from .btrfs import subvolume_info_from_path
for mountpoint in self.mount_information:
if target := mountpoint.get('target'):
if subvolume := subvolume_info_from_path(pathlib.Path(target)):
- yield subvolume \ No newline at end of file
+ yield subvolume
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 2c9f50c2..6f25a5f7 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -14,7 +14,7 @@ from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
from .btrfs.btrfs_helpers import subvolume_info_from_path
-from .btrfs.btrfssubvolume import BtrfsSubvolume
+from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
class Partition:
def __init__(self,
@@ -185,7 +185,7 @@ class Partition:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
if not self.partprobe():
raise DiskError(f"Could not perform partprobe on {self.device_path}")
-
+
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
partuuid = self._safe_part_uuid
@@ -294,9 +294,9 @@ class Partition:
return bind_name
@property
- def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]:
from .helpers import findmnt
-
+
def iterate_children_recursively(information):
for child in information.get('children', []):
if target := child.get('target'):
@@ -452,7 +452,7 @@ class Partition:
if retry is True:
log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
time.sleep(storage.get('DISK_TIMEOUTS', 1))
-
+
return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 5fa6bfdc..5809c073 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -3,6 +3,8 @@ import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
+from ..models.subvolume import Subvolume
+
if TYPE_CHECKING:
from .blockdevice import BlockDevice
_: Any
@@ -107,17 +109,14 @@ def suggest_single_disk_layout(block_device :BlockDevice,
# https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash
# https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh
layout[block_device.path]['partitions'][1]['btrfs'] = {
- "subvolumes" : {
- "@":"/",
- "@home": "/home",
- "@log": "/var/log",
- "@pkg": "/var/cache/pacman/pkg",
- "@.snapshots": "/.snapshots"
- }
+ 'subvolumes': [
+ Subvolume('@', '/'),
+ Subvolume('@home', '/home'),
+ Subvolume('@log', '/var/log'),
+ Subvolume('@pkg', '/var/cache/pacman/pkg'),
+ Subvolume('@.snapshots', '/.snapshots')
+ ]
}
- # else:
- # pass # ... implement a guided setup
-
elif using_home_partition:
# If we don't want to use subvolumes,
# But we want to be able to re-use data between re-installs..
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index bf296c2e..97c2492d 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -24,6 +24,7 @@ from .disk.partition import get_mount_fs_type
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
from .hsm import fido2_enroll
from .models.users import User
+from .models.subvolume import Subvolume
if TYPE_CHECKING:
_: Any
@@ -263,47 +264,25 @@ class Installer:
hsm_device_path = storage['arguments']['HSM']
fido2_enroll(hsm_device_path, partition['device_instance'], password)
- # we manage the btrfs partitions
- if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
- for partition in btrfs_subvolumes:
- if mount_options := ','.join(partition.get('filesystem',{}).get('mount_options',[])):
- self.mount(partition['device_instance'], "/", options=mount_options)
- else:
- self.mount(partition['device_instance'], "/")
-
- setup_subvolumes(
- installation=self,
- partition_dict=partition
- )
+ btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])]
- partition['device_instance'].unmount()
+ for partition in btrfs_subvolumes:
+ device_instance = partition['device_instance']
+ mount_options = partition.get('filesystem', {}).get('mount_options', [])
+ self.mount(device_instance, "/", options=','.join(mount_options))
+ setup_subvolumes(installation=self, partition_dict=partition)
+ device_instance.unmount()
# We then handle any special cases, such as btrfs
- if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
- for partition_information in btrfs_subvolumes:
- for name, mountpoint in sorted(partition_information['btrfs']['subvolumes'].items(), key=lambda item: item[1]):
- btrfs_subvolume_information = {}
-
- match mountpoint:
- case str(): # backwards-compatability
- btrfs_subvolume_information['mountpoint'] = mountpoint
- btrfs_subvolume_information['options'] = []
- case dict():
- btrfs_subvolume_information['mountpoint'] = mountpoint.get('mountpoint', None)
- btrfs_subvolume_information['options'] = mountpoint.get('options', [])
- case _:
- continue
-
- if mountpoint_parsed := btrfs_subvolume_information.get('mountpoint'):
- # We cache the mount call for later
- mount_queue[mountpoint_parsed] = lambda device=partition_information['device_instance'], \
- name=name, \
- subvolume_information=btrfs_subvolume_information: mount_subvolume(
- installation=self,
- device=device,
- name=name,
- subvolume_information=subvolume_information
- )
+ for partition in btrfs_subvolumes:
+ subvolumes: List[Subvolume] = partition['btrfs']['subvolumes']
+ for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint):
+ # We cache the mount call for later
+ mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume(
+ installation=self,
+ device=device,
+ subvolume=sub_vol
+ )
# We mount ordinary partitions, and we sort them by the mountpoint
for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']):
diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py
index cb61168d..49083517 100644
--- a/archinstall/lib/menu/global_menu.py
+++ b/archinstall/lib/menu/global_menu.py
@@ -325,22 +325,23 @@ class GlobalMenu(GeneralMenu):
def _select_harddrives(self, old_harddrives : list) -> List:
harddrives = select_harddrives(old_harddrives)
- if len(harddrives) == 0:
- prompt = _(
- "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n"
- "WARNING: Archinstall won't check the suitability of this setup\n"
- "Do you wish to continue?"
- ).format(storage['MOUNT_POINT'])
-
- choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run()
-
- if choice.value == Menu.no():
- return self._select_harddrives(old_harddrives)
-
- # in case the harddrives got changed we have to reset the disk layout as well
- if old_harddrives != harddrives:
- self._menu_options['disk_layouts'].set_current_selection(None)
- storage['arguments']['disk_layouts'] = {}
+ if harddrives:
+ if len(harddrives) == 0:
+ prompt = _(
+ "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n"
+ "WARNING: Archinstall won't check the suitability of this setup\n"
+ "Do you wish to continue?"
+ ).format(storage['MOUNT_POINT'])
+
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run()
+
+ if choice.value == Menu.no():
+ return self._select_harddrives(old_harddrives)
+
+ # in case the harddrives got changed we have to reset the disk layout as well
+ if old_harddrives != harddrives:
+ self._menu_options['disk_layouts'].set_current_selection(None)
+ storage['arguments']['disk_layouts'] = {}
return harddrives
diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py
index 7e051528..40d01ce3 100644
--- a/archinstall/lib/menu/list_manager.py
+++ b/archinstall/lib/menu/list_manager.py
@@ -137,34 +137,35 @@ class ListManager:
else:
self._default_action = [str(default_action),]
- self.header = header if header else None
- self.cancel_action = str(_('Cancel'))
- self.confirm_action = str(_('Confirm and exit'))
- self.separator = ''
- self.bottom_list = [self.confirm_action,self.cancel_action]
- self.bottom_item = [self.cancel_action]
- self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))]
+ self._header = header if header else None
+ self._cancel_action = str(_('Cancel'))
+ self._confirm_action = str(_('Confirm and exit'))
+ self._separator = ''
+ self._bottom_list = [self._confirm_action, self._cancel_action]
+ self._bottom_item = [self._cancel_action]
+ self._base_actions = base_actions if base_actions else [str(_('Add')), str(_('Copy')), str(_('Edit')), str(_('Delete'))]
self._original_data = copy.deepcopy(base_list)
self._data = copy.deepcopy(base_list) # as refs, changes are immediate
+
# default values for the null case
self.target: Optional[Any] = None
self.action = self._null_action
- if len(self._data) == 0 and self._null_action:
- self._data = self.exec_action(self._data)
-
def run(self):
while True:
# this will return a dictionary with the key as the menu entry to be displayed
# and the value is the original value from the self._data container
+
data_formatted = self.reformat(self._data)
options = list(data_formatted.keys())
- options.append(self.separator)
+
+ if len(options) > 0:
+ options.append(self._separator)
if self._default_action:
options += self._default_action
- options += self.bottom_list
+ options += self._bottom_list
system('clear')
@@ -174,12 +175,12 @@ class ListManager:
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
- header=self.header,
+ header=self._header,
skip_empty_entries=True,
skip=False
).run()
- if not target.value or target.value in self.bottom_list:
+ if not target.value or target.value in self._bottom_list:
self.action = target
break
@@ -201,13 +202,13 @@ class ListManager:
# Possible enhancement. If run_actions returns false a message line indicating the failure
self.run_actions(target.value)
- if target.value == self.cancel_action: # TODO dubious
+ if target.value == self._cancel_action: # TODO dubious
return self._original_data # return the original list
else:
return self._data
def run_actions(self,prompt_data=None):
- options = self.action_list() + self.bottom_item
+ options = self.action_list() + self._bottom_item
prompt = _("Select an action for < {} >").format(prompt_data if prompt_data else self.target)
choice = Menu(
prompt,
@@ -215,13 +216,13 @@ class ListManager:
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
- preset_values=self.bottom_item,
+ preset_values=self._bottom_item,
show_search_hint=False
).run()
self.action = choice.value
- if self.action and self.action != self.cancel_action:
+ if self.action and self.action != self._cancel_action:
self._data = self.exec_action(self._data)
"""
@@ -243,7 +244,7 @@ class ListManager:
can define alternate action list or customize the list for each item.
Executed after any item is selected, contained in self.target
"""
- return self.base_actions
+ return self._base_actions
def exec_action(self, data: Any):
"""
diff --git a/archinstall/lib/models/subvolume.py b/archinstall/lib/models/subvolume.py
new file mode 100644
index 00000000..34a09227
--- /dev/null
+++ b/archinstall/lib/models/subvolume.py
@@ -0,0 +1,68 @@
+from dataclasses import dataclass
+from typing import List, Any, Dict
+
+
+@dataclass
+class Subvolume:
+ name: str
+ mountpoint: str
+ compress: bool = False
+ nodatacow: bool = False
+
+ def display(self) -> str:
+ options_str = ','.join(self.options)
+ return f'{_("Subvolume")}: {self.name:15} {_("Mountpoint")}: {self.mountpoint:20} {_("Options")}: {options_str}'
+
+ @property
+ def options(self) -> List[str]:
+ options = [
+ 'compress' if self.compress else '',
+ 'nodatacow' if self.nodatacow else ''
+ ]
+ return [o for o in options if len(o)]
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'mountpoint': self.mountpoint,
+ 'compress': self.compress,
+ 'nodatacow': self.nodatacow
+ }
+
+ @classmethod
+ def _parse(cls, config_subvolumes: List[Dict[str, Any]]) -> List['Subvolume']:
+ subvolumes = []
+ for entry in config_subvolumes:
+ if not entry.get('name', None) or not entry.get('mountpoint', None):
+ continue
+
+ subvolumes.append(
+ Subvolume(
+ entry['name'],
+ entry['mountpoint'],
+ entry.get('compress', False),
+ entry.get('nodatacow', False)
+ )
+ )
+
+ return subvolumes
+
+ @classmethod
+ def _parse_backwards_compatible(cls, config_subvolumes) -> List['Subvolume']:
+ subvolumes = []
+ for name, mountpoint in config_subvolumes.items():
+ if not name or not mountpoint:
+ continue
+
+ subvolumes.append(Subvolume(name, mountpoint))
+
+ return subvolumes
+
+ @classmethod
+ def parse_arguments(cls, config_subvolumes: Any) -> List['Subvolume']:
+ if isinstance(config_subvolumes, list):
+ return cls._parse(config_subvolumes)
+ elif isinstance(config_subvolumes, dict):
+ return cls._parse_backwards_compatible(config_subvolumes)
+
+ raise ValueError('Unknown disk layout btrfs subvolume format')
diff --git a/archinstall/lib/models/users.py b/archinstall/lib/models/users.py
index f72cabde..a8feb9ef 100644
--- a/archinstall/lib/models/users.py
+++ b/archinstall/lib/models/users.py
@@ -27,8 +27,10 @@ class User:
}
def display(self) -> str:
- strength = PasswordStrength.strength(self.password)
- password = '*' * len(self.password) + f' ({strength.value})'
+ password = '*' * (len(self.password) if self.password else 0)
+ if password:
+ strength = PasswordStrength.strength(self.password)
+ password += f' ({strength.value})'
return f'{_("Username")}: {self.username:16} {_("Password")}: {password:20} sudo: {str(self.sudo)}'
@classmethod
diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py
index caf5f5df..63b7c7df 100644
--- a/archinstall/lib/user_interaction/partitioning_conf.py
+++ b/archinstall/lib/user_interaction/partitioning_conf.py
@@ -351,18 +351,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if partition is not None:
if not block_device_struct["partitions"][partition].get('btrfs', {}):
block_device_struct["partitions"][partition]['btrfs'] = {}
- if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', {}):
- block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = {}
+ if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []):
+ block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = []
prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes']
- result = SubvolumeList(_("Manage btrfs subvolumes for current partition"),prev).run()
- if result:
- block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
- else:
- del block_device_struct["partitions"][partition]['btrfs']
+ result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run()
+ block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
return block_device_struct
+
def select_encrypted_partitions(
title :str,
partitions :List[Partition],
diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py
index 94e6f5d7..a54ec891 100644
--- a/archinstall/lib/user_interaction/subvolume_config.py
+++ b/archinstall/lib/user_interaction/subvolume_config.py
@@ -1,155 +1,94 @@
-from typing import Dict, List
+from typing import Dict, List, Optional, Any, TYPE_CHECKING
from ..menu.list_manager import ListManager
from ..menu.menu import MenuSelectionType
-from ..menu.selection_menu import Selector, GeneralMenu
from ..menu.text_input import TextInput
from ..menu import Menu
+from ..models.subvolume import Subvolume
+
+if TYPE_CHECKING:
+ _: Any
-"""
-UI classes
-"""
class SubvolumeList(ListManager):
- def __init__(self,prompt,list):
- self.ObjectNullAction = None # str(_('Add'))
- self.ObjectDefaultAction = str(_('Add'))
- super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction)
-
- def reformat(self, data: Dict) -> Dict:
- def presentation(key :str, value :Dict):
- text = _(" Subvolume :{:16}").format(key)
- if isinstance(value,str):
- text += _(" mounted at {:16}").format(value)
- else:
- if value.get('mountpoint'):
- text += _(" mounted at {:16}").format(value['mountpoint'])
- else:
- text += (' ' * 28)
-
- if value.get('options',[]):
- text += _(" with option {}").format(', '.join(value['options']))
- return text
-
- formatted = {presentation(k, v): k for k, v in data.items()}
- return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])}
+ def __init__(self, prompt: str, current_volumes: List[Subvolume]):
+ self._actions = [
+ str(_('Add subvolume')),
+ str(_('Edit subvolume')),
+ str(_('Delete subvolume'))
+ ]
+ super().__init__(prompt, current_volumes, self._actions, self._actions[0])
- def action_list(self):
- return super().action_list()
+ def reformat(self, data: List[Subvolume]) -> Dict[str, Subvolume]:
+ return {e.display(): e for e in data}
- def exec_action(self, data: Dict):
- if self.target:
- origkey, origval = list(self.target.items())[0]
- else:
- origkey = None
+ def action_list(self):
+ active_user = self.target if self.target else None
- if self.action == str(_('Delete')):
- del data[origkey]
+ if active_user is None:
+ return [self._actions[0]]
else:
- if self.action == str(_('Add')):
- self.target = {}
- print(_('\n Fill the desired values for a new subvolume \n'))
- with SubvolumeMenu(self.target,self.action) as add_menu:
- for elem in ['name','mountpoint','options']:
- add_menu.exec_option(elem)
- else:
- SubvolumeMenu(self.target,self.action).run()
+ return self._actions[1:]
- data.update(self.target)
+ def _prompt_options(self, editing: Optional[Subvolume] = None) -> List[str]:
+ preset_options = []
+ if editing:
+ preset_options = editing.options
- return data
-
-
-class SubvolumeMenu(GeneralMenu):
- def __init__(self,parameters,action=None):
- self.data = parameters
- self.action = action
- self.ds = {}
- self.ds['name'] = None
- self.ds['mountpoint'] = None
- self.ds['options'] = None
- if self.data:
- origkey,origval = list(self.data.items())[0]
- self.ds['name'] = origkey
- if isinstance(origval,str):
- self.ds['mountpoint'] = origval
- else:
- self.ds['mountpoint'] = self.data[origkey].get('mountpoint')
- self.ds['options'] = self.data[origkey].get('options')
-
- super().__init__(data_store=self.ds)
-
- def _setup_selection_menu_options(self):
- self._menu_options['name'] = Selector(
- str(_('Subvolume name ')),
- self._select_subvolume_name if not self.action or self.action in (str(_('Add')), str(_('Copy'))) else None,
- mandatory=True,
- enabled=True)
-
- self._menu_options['mountpoint'] = Selector(
- str(_('Subvolume mountpoint')),
- self._select_subvolume_mount_point if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
- enabled=True)
-
- self._menu_options['options'] = Selector(
- str(_('Subvolume options')),
- self._select_subvolume_options if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
- enabled=True)
-
- self._menu_options['save'] = Selector(
- str(_('Save')),
- exec_func=lambda n,v:True,
- enabled=True)
-
- self._menu_options['cancel'] = Selector(
- str(_('Cancel')),
- # func = lambda pre:True,
- exec_func=lambda n,v:self.fast_exit(n),
- enabled=True)
-
- self.cancel_action = 'cancel'
- self.save_action = 'save'
- self.bottom_list = [self.save_action,self.cancel_action]
-
- def fast_exit(self,accion):
- if self.option(accion).get_selection():
- for item in self.list_options():
- if self.option(item).is_mandatory():
- self.option(item).set_mandatory(False)
- return True
-
- def exit_callback(self):
- # we exit without moving data
- if self.option(self.cancel_action).get_selection():
- return
- if not self.ds['name']:
- return
- else:
- key = self.ds['name']
- value = {}
- if self.ds['mountpoint']:
- value['mountpoint'] = self.ds['mountpoint']
- if self.ds['options']:
- value['options'] = self.ds['options']
- self.data.update({key : value})
-
- def _select_subvolume_name(self,value):
- return TextInput(str(_("Subvolume name :")),value).run()
-
- def _select_subvolume_mount_point(self,value):
- return TextInput(str(_("Select a mount point :")),value).run()
-
- def _select_subvolume_options(self,value) -> List[str]:
- # def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True):
choice = Menu(
str(_("Select the desired subvolume options ")),
['nodatacow','compress'],
skip=True,
- preset_values=value,
+ preset_values=preset_options,
multi=True
).run()
if choice.type_ == MenuSelectionType.Selection:
- return choice.value
+ return choice.value # type: ignore
return []
+
+ def _add_subvolume(self, editing: Optional[Subvolume] = None) -> Optional[Subvolume]:
+ name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run()
+
+ if not name:
+ return None
+
+ mountpoint = TextInput(f'\n{_("Subvolume mountpoint")}: ', editing.mountpoint if editing else '').run()
+
+ if not mountpoint:
+ return None
+
+ options = self._prompt_options(editing)
+
+ subvolume = Subvolume(name, mountpoint)
+ subvolume.compress = 'compress' in options
+ subvolume.nodatacow = 'nodatacow' in options
+
+ return subvolume
+
+ def exec_action(self, data: List[Subvolume]) -> List[Subvolume]:
+ if self.target:
+ active_subvolume = self.target
+ else:
+ active_subvolume = None
+
+ if self.action == self._actions[0]: # add
+ new_subvolume = self._add_subvolume()
+
+ if new_subvolume is not None:
+ # in case a user with the same username as an existing user
+ # was created we'll replace the existing one
+ data = [d for d in data if d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif self.action == self._actions[1]: # edit subvolume
+ new_subvolume = self._add_subvolume(active_subvolume)
+
+ if new_subvolume is not None:
+ # we'll remove the original subvolume and add the modified version
+ data = [d for d in data if d.name != active_subvolume.name and d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif self.action == self._actions[2]: # delete
+ data = [d for d in data if d != active_subvolume]
+
+ return data