Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/configuration.py14
-rw-r--r--archinstall/lib/disk/blockdevice.py348
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py132
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py126
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py37
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py (renamed from archinstall/lib/disk/btrfs/btrfssubvolume.py)9
-rw-r--r--archinstall/lib/disk/filesystem.py38
-rw-r--r--archinstall/lib/disk/helpers.py50
-rw-r--r--archinstall/lib/disk/mapperdev.py16
-rw-r--r--archinstall/lib/disk/partition.py426
-rw-r--r--archinstall/lib/disk/user_guides.py19
-rw-r--r--archinstall/lib/disk/validators.py8
-rw-r--r--archinstall/lib/exceptions.py8
-rw-r--r--archinstall/lib/general.py43
-rw-r--r--archinstall/lib/installer.py124
-rw-r--r--archinstall/lib/locale_helpers.py2
-rw-r--r--archinstall/lib/luks.py6
-rw-r--r--archinstall/lib/menu/global_menu.py134
-rw-r--r--archinstall/lib/menu/list_manager.py314
-rw-r--r--archinstall/lib/menu/menu.py74
-rw-r--r--archinstall/lib/menu/selection_menu.py55
-rw-r--r--archinstall/lib/menu/simple_menu.py23
-rw-r--r--archinstall/lib/mirrors.py27
-rw-r--r--archinstall/lib/models/network_configuration.py67
-rw-r--r--archinstall/lib/models/password_strength.py85
-rw-r--r--archinstall/lib/models/subvolume.py68
-rw-r--r--archinstall/lib/models/users.py13
-rw-r--r--archinstall/lib/networking.py15
-rw-r--r--archinstall/lib/output.py80
-rw-r--r--archinstall/lib/plugins.py4
-rw-r--r--archinstall/lib/storage.py4
-rw-r--r--archinstall/lib/systemd.py2
-rw-r--r--archinstall/lib/translation.py112
-rw-r--r--archinstall/lib/translationhandler.py230
-rw-r--r--archinstall/lib/user_interaction/__init__.py2
-rw-r--r--archinstall/lib/user_interaction/backwards_compatible_conf.py2
-rw-r--r--archinstall/lib/user_interaction/disk_conf.py4
-rw-r--r--archinstall/lib/user_interaction/general_conf.py121
-rw-r--r--archinstall/lib/user_interaction/manage_users_conf.py60
-rw-r--r--archinstall/lib/user_interaction/network_conf.py98
-rw-r--r--archinstall/lib/user_interaction/partitioning_conf.py74
-rw-r--r--archinstall/lib/user_interaction/subvolume_config.py202
-rw-r--r--archinstall/lib/user_interaction/system_conf.py8
-rw-r--r--archinstall/lib/user_interaction/utils.py32
44 files changed, 1766 insertions, 1550 deletions
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py
index 510f7103..2a43174d 100644
--- a/archinstall/lib/configuration.py
+++ b/archinstall/lib/configuration.py
@@ -1,4 +1,6 @@
+import os
import json
+import stat
import logging
import pathlib
from typing import Optional, Dict
@@ -106,23 +108,33 @@ class ConfigurationOutput:
def save_user_config(self, dest_path :pathlib.Path = None):
if self._is_valid_path(dest_path):
- with open(dest_path / self._user_config_file, 'w') as config_file:
+ target = dest_path / self._user_config_file
+
+ with open(target, 'w') as config_file:
config_file.write(self.user_config_to_json())
+ os.chmod(str(dest_path / self._user_config_file), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
+
def save_user_creds(self, dest_path :pathlib.Path = None):
if self._is_valid_path(dest_path):
if user_creds := self.user_credentials_to_json():
target = dest_path / self._user_creds_file
+
with open(target, 'w') as config_file:
config_file.write(user_creds)
+ os.chmod(str(target), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
+
def save_disk_layout(self, dest_path :pathlib.Path = None):
if self._is_valid_path(dest_path):
if disk_layout := self.disk_layout_to_json():
target = dest_path / self._disk_layout_file
+
with target.open('w') as config_file:
config_file.write(disk_layout)
+ os.chmod(str(target), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
+
def save(self, dest_path :pathlib.Path = None):
if not dest_path:
dest_path = self._default_save_path
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index c7b69205..736bacbc 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -1,13 +1,11 @@
from __future__ import annotations
-import os
import json
import logging
import time
-from functools import cached_property
-from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-if TYPE_CHECKING:
- from .partition import Partition
+
+from collections import OrderedDict
+from dataclasses import dataclass
+from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING
from ..exceptions import DiskError, SysCallError
from ..output import log
@@ -15,18 +13,44 @@ from ..general import SysCommand
from ..storage import storage
+if TYPE_CHECKING:
+ from .partition import Partition
+ _: Any
+
+
+@dataclass
+class BlockSizeInfo:
+ start: str
+ end: str
+ size: str
+
+
+@dataclass
+class BlockInfo:
+ pttype: str
+ ptuuid: str
+ size: int
+ tran: Optional[str]
+ rota: bool
+ free_space: Optional[List[BlockSizeInfo]]
+
+
class BlockDevice:
def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
from .helpers import all_blockdevices
# If we don't give any information, we need to auto-fill it.
# Otherwise any subsequent usage will break.
- info = all_blockdevices(partitions=False)[path].info
+ self.info = all_blockdevices(partitions=False)[path].info
+ else:
+ self.info = info
- self.path = path
- self.info = info
+ self._path = path
self.keep_partitions = True
- self.part_cache = {}
+ self._block_info = self._fetch_information()
+ self._partitions: Dict[str, 'Partition'] = {}
+
+ self._load_partitions()
# TODO: Currently disk encryption is a BIT misleading.
# It's actually partition-encryption, but for future-proofing this
@@ -35,70 +59,113 @@ class BlockDevice:
def __repr__(self, *args :str, **kwargs :str) -> str:
return self._str_repr
- @cached_property
+ @property
+ def path(self) -> str:
+ return self._path
+
+ @property
def _str_repr(self) -> str:
- return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})"
-
- @cached_property
- def display_info(self) -> str:
- columns = {
- str(_('Device')): self.device_or_backfile,
- str(_('Size')): f'{self._safe_size}GB',
- str(_('Free space')): f'{self._safe_free_space}',
+ return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})"
+
+ def as_json(self) -> Dict[str, Any]:
+ return {
+ str(_('Device')): self._device_or_backfile,
+ str(_('Size')): f'{self.size}GB',
+ str(_('Free space')): f'{self._safe_free_space()}',
str(_('Bus-type')): f'{self.bus_type}'
}
- padding = max([len(k) for k in columns.keys()])
-
- pretty = ''
- for k, v in columns.items():
- k = k.ljust(padding, ' ')
- pretty += f'{k} = {v}\n'
-
- return pretty.rstrip()
-
- def __iter__(self) -> Iterator[Partition]:
+ def __iter__(self) -> Iterator['Partition']:
for partition in self.partitions:
yield self.partitions[partition]
def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any:
if hasattr(self, key):
return getattr(self, key)
- elif key not in self.info:
- raise KeyError(f'{self} does not contain information: "{key}"')
- return self.info[key]
- def __len__(self) -> int:
- return len(self.partitions)
+ if self.info and key in self.info:
+ return self.info[key]
+
+ raise KeyError(f'{self.info} does not contain information: "{key}"')
def __lt__(self, left_comparitor :'BlockDevice') -> bool:
- return self.path < left_comparitor.path
+ return self._path < left_comparitor.path
def json(self) -> str:
"""
json() has precedence over __dump__, so this is a way
to give less/partial information for user readability.
"""
- return self.path
+ return self._path
def __dump__(self) -> Dict[str, Dict[str, Any]]:
return {
- self.path : {
- 'partuuid' : self.uuid,
- 'wipe' : self.info.get('wipe', None),
- 'partitions' : [part.__dump__() for part in self.partitions.values()]
+ self._path: {
+ 'partuuid': self.uuid,
+ 'wipe': self.info.get('wipe', None),
+ 'partitions': [part.__dump__() for part in self.partitions.values()]
}
}
- @property
- def partition_type(self) -> str:
- output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
+ def _call_lsblk(self, path: str) -> Dict[str, Any]:
+ output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8')
+ if output:
+ lsblk_info = json.loads(output)
+ return lsblk_info
+
+ raise DiskError(f'Failed to read disk "{self.path}" with lsblk')
+
+ def _load_partitions(self):
+ from .partition import Partition
+
+ self._partitions.clear()
+
+ lsblk_info = self._call_lsblk(self._path)
+ device = lsblk_info['blockdevices'][0]
+ self._partitions.clear()
+
+ if children := device.get('children', None):
+ root = f'/dev/{device["name"]}'
+ for child in children:
+ part_id = child['name'].removeprefix(device['name'])
+ self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id)
+
+ def _get_free_space(self) -> Optional[List[BlockSizeInfo]]:
+ # NOTE: parted -s will default to `cancel` on prompt, skipping any partition
+ # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
+ # so the free will ignore the ESP partition and just give the "free" space.
+ # Doesn't harm us, but worth noting in case something weird happens.
+ try:
+ output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8')
+ if output:
+ free_lines = [line for line in output.split('\n') if 'free' in line]
+ sizes = []
+ for free_space in free_lines:
+ _, start, end, size, *_ = free_space.strip('\r\n;').split(':')
+ sizes.append(BlockSizeInfo(start, end, size))
+
+ return sizes
+ except SysCallError as error:
+ log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG)
+
+ return None
+
+ def _fetch_information(self) -> BlockInfo:
+ lsblk_info = self._call_lsblk(self._path)
+ device = lsblk_info['blockdevices'][0]
+ free_space = self._get_free_space()
- for device in output['blockdevices']:
- return device['pttype']
+ return BlockInfo(
+ pttype=device['pttype'],
+ ptuuid=device['ptuuid'],
+ size=device['size'],
+ tran=device['tran'],
+ rota=device['rota'],
+ free_space=free_space
+ )
- @cached_property
- def device_or_backfile(self) -> str:
+ @property
+ def _device_or_backfile(self) -> Optional[str]:
"""
Returns the actual device-endpoint of the BlockDevice.
If it's a loop-back-device it returns the back-file,
@@ -118,7 +185,7 @@ class BlockDevice:
return None
@property
- def device(self) -> str:
+ def device(self) -> Optional[str]:
"""
Returns the device file of the BlockDevice.
If it's a loop-back-device it returns the /dev/X device,
@@ -126,168 +193,82 @@ class BlockDevice:
And if it's a crypto-device it returns the parent device
"""
if "DEVTYPE" not in self.info:
- raise DiskError(f'Could not locate backplane info for "{self.path}"')
+ raise DiskError(f'Could not locate backplane info for "{self._path}"')
if self.info['DEVTYPE'] in ['disk','loop']:
- return self.path
+ return self._path
elif self.info['DEVTYPE'][:4] == 'raid':
# This should catch /dev/md## raid devices
- return self.path
+ return self._path
elif self.info['DEVTYPE'] == 'crypt':
if 'pkname' not in self.info:
- raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
+ raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
else:
- log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
-
- # if not stat.S_ISBLK(os.stat(full_path).st_mode):
- # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
-
- @property
- def partitions(self) -> Dict[str, Partition]:
- from .filesystem import Partition
-
- self.partprobe()
- result = SysCommand(['/usr/bin/lsblk', '-J', self.path])
-
- if b'not a block device' in result:
- raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}')
+ log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
- if not result[:1] == b'{':
- raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}')
-
- r = json.loads(result.decode('UTF-8'))
- if len(r['blockdevices']) and 'children' in r['blockdevices'][0]:
- root_path = f"/dev/{r['blockdevices'][0]['name']}"
- for part in r['blockdevices'][0]['children']:
- part_id = part['name'][len(os.path.basename(self.path)):]
- if part_id not in self.part_cache:
- # TODO: Force over-write even if in cache?
- if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']:
- self.part_cache[part_id] = Partition(root_path + part_id, block_device=self, part_id=part_id)
-
- return {k: self.part_cache[k] for k in sorted(self.part_cache)}
+ return None
@property
- def partition(self) -> Partition:
- all_partitions = self.partitions
- return [all_partitions[k] for k in all_partitions]
+ def partition_type(self) -> str:
+ return self._block_info.pttype
@property
- def partition_table_type(self) -> int:
- # TODO: Don't hardcode :)
- # Remove if we don't use this function anywhere
- from .filesystem import GPT
- return GPT
-
- @cached_property
def uuid(self) -> str:
- log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
- """
- Returns the disk UUID as returned by lsblk.
- This is more reliable than relying on /dev/disk/by-partuuid as
- it doesn't seam to be able to detect md raid partitions.
- """
- return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
-
- @cached_property
- def _safe_size(self) -> float:
- from .helpers import convert_size_to_gb
-
- try:
- output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
- except SysCallError:
- return -1.0
-
- for device in output['blockdevices']:
- return convert_size_to_gb(device['size'])
+ return self._block_info.ptuuid
- @cached_property
+ @property
def size(self) -> float:
from .helpers import convert_size_to_gb
+ return convert_size_to_gb(self._block_info.size)
- output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- return convert_size_to_gb(device['size'])
-
- @cached_property
- def bus_type(self) -> str:
- output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- return device['tran']
+ @property
+ def bus_type(self) -> Optional[str]:
+ return self._block_info.tran
- @cached_property
+ @property
def spinning(self) -> bool:
- output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
+ return self._block_info.rota
- for device in output['blockdevices']:
- return device['rota'] is True
+ @property
+ def partitions(self) -> Dict[str, 'Partition']:
+ return OrderedDict(sorted(self._partitions.items()))
- @cached_property
- def _safe_free_space(self) -> Tuple[str, ...]:
- try:
- return '+'.join(part[2] for part in self.free_space)
- except SysCallError:
- return '?'
+ @property
+ def partition(self) -> List['Partition']:
+ return list(self.partitions.values())
- @cached_property
- def free_space(self) -> Tuple[str, ...]:
- # NOTE: parted -s will default to `cancel` on prompt, skipping any partition
- # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
- # so the free will ignore the ESP partition and just give the "free" space.
- # Doesn't harm us, but worth noting in case something weird happens.
- try:
- for line in SysCommand(f"parted -s --machine {self.path} print free"):
- if 'free' in (free_space := line.decode('UTF-8')):
- _, start, end, size, *_ = free_space.strip('\r\n;').split(':')
- yield (start, end, size)
- except SysCallError as error:
- log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
-
- @cached_property
- def largest_free_space(self) -> List[str]:
- info = []
- for space_info in self.free_space:
- if not info:
- info = space_info
- else:
- # [-1] = size
- if space_info[-1] > info[-1]:
- info = space_info
- return info
-
- @cached_property
+ @property
def first_free_sector(self) -> str:
- if info := self.largest_free_space:
- start = info[0]
+ if block_size := self._largest_free_space():
+ return block_size.start
else:
- start = '512MB'
- return start
+ return '512MB'
- @cached_property
+ @property
def first_end_sector(self) -> str:
- if info := self.largest_free_space:
- end = info[1]
+ if block_size := self._largest_free_space():
+ return block_size.end
else:
- end = f"{self.size}GB"
- return end
-
- def partprobe(self) -> bool:
- return SysCommand(['partprobe', self.path]).exit_code == 0
+ return f"{self.size}GB"
+
+ def _safe_free_space(self) -> str:
+ if self._block_info.free_space:
+ sizes = [free_space.size for free_space in self._block_info.free_space]
+ return '+'.join(sizes)
+ return '?'
+
+ def _largest_free_space(self) -> Optional[BlockSizeInfo]:
+ if self._block_info.free_space:
+ sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True)
+ return sorted_sizes[0]
+ return None
- def has_partitions(self) -> int:
- return len(self.partitions)
-
- def has_mount_point(self, mountpoint :str) -> bool:
- for partition in self.partitions:
- if self.partitions[partition].mountpoint == mountpoint:
- return True
- return False
+ def _partprobe(self) -> bool:
+ return SysCommand(['partprobe', self._path]).exit_code == 0
def flush_cache(self) -> None:
- self.part_cache = {}
+ self._load_partitions()
def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition:
if not uuid and not partuuid:
@@ -296,9 +277,9 @@ class BlockDevice:
for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)):
for partition_index, partition in self.partitions.items():
try:
- if uuid and partition.uuid.lower() == uuid.lower():
+ if uuid and partition.uuid and partition.uuid.lower() == uuid.lower():
return partition
- elif partuuid and partition.part_uuid.lower() == partuuid.lower():
+ elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower():
return partition
except DiskError as error:
# Most likely a blockdevice that doesn't support or use UUID's
@@ -307,9 +288,10 @@ class BlockDevice:
pass
log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
+ self.flush_cache()
time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
-
+
log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)
- log(f"Cache: {self.part_cache}")
+ log(f"Cache: {self._partitions}")
log(f"Partitions: {self.partitions.items()}")
raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.")
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py
index 84b9c0f6..a26e0160 100644
--- a/archinstall/lib/disk/btrfs/__init__.py
+++ b/archinstall/lib/disk/btrfs/__init__.py
@@ -2,8 +2,7 @@ from __future__ import annotations
import pathlib
import glob
import logging
-import re
-from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+from typing import Union, Dict, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -15,30 +14,15 @@ from .btrfs_helpers import (
setup_subvolumes as setup_subvolumes,
mount_subvolume as mount_subvolume
)
-from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume
from .btrfspartition import BTRFSPartition as BTRFSPartition
-from ..helpers import get_mount_info
from ...exceptions import DiskError, Deprecated
from ...general import SysCommand
from ...output import log
-from ...exceptions import SysCallError
-def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
- try:
- output = SysCommand(f"btrfs subvol show {path}").decode()
- except SysCallError as error:
- print('Error:', error)
- result = {}
- for line in output.replace('\r\n', '\n').split('\n'):
- if ':' in line:
- key, val = line.replace('\t', '').split(':', 1)
- result[key.strip().lower().replace(' ', '_')] = val.strip()
-
- return result
-
-def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
+def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -70,113 +54,3 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.
log(f"Creating a subvolume on {target}", level=logging.INFO)
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
-
-def _has_option(option :str,options :list) -> bool:
- """ auxiliary routine to check if an option is present in a list.
- we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...)
- """
- if not options:
- return False
-
- for item in options:
- if option in item:
- return True
-
- return False
-
-def manage_btrfs_subvolumes(installation :Installer,
- partition :Dict[str, str],) -> list:
-
- raise Deprecated("Use setup_subvolumes() instead.")
-
- from copy import deepcopy
- """ we do the magic with subvolumes in a centralized place
- parameters:
- * the installation object
- * the partition dictionary entry which represents the physical partition
- returns
- * mountpoinst, the list which contains all the "new" partititon to be mounted
-
- We expect the partition has been mounted as / , and it to be unmounted after the processing
- Then we create all the subvolumes inside btrfs as demand
- We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
- Then we return a list of "new" partitions to be processed as "normal" partitions
- # TODO For encrypted devices we need some special processing prior to it
- """
- # We process each of the pairs <subvolume name: mount point | None | mount info dict>
- # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
- # of mount options (or similar used by brtfs)
- mountpoints = []
- subvolumes = partition['btrfs']['subvolumes']
- for name, right_hand in subvolumes.items():
- try:
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
- if name.startswith('/'):
- name = name[1:]
- # renormalize the right hand.
- location = None
- subvol_options = []
- # no contents, so it is not to be mounted
- if not right_hand:
- location = None
- # just a string. per backward compatibility the mount point
- elif isinstance(right_hand,str):
- location = right_hand
- # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
- elif isinstance(right_hand,dict):
- location = right_hand.get('mountpoint',None)
- subvol_options = right_hand.get('options',[])
- # we create the subvolume
- create_subvolume(installation,name)
- # Make the nodatacow processing now
- # It will be the main cause of creation of subvolumes which are not to be mounted
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
- if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
- # Make the compress processing now
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- # in this way only zstd compression is activaded
- # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
- if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])):
- if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
- # END compress processing.
- # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
- if not partition['mountpoint'] and location is not None:
- # we begin to create a fake partition entry. First we copy the original -the one that corresponds to
- # the primary partition. We make a deepcopy to avoid altering the original content in any case
- fake_partition = deepcopy(partition)
- # we start to modify entries in the "fake partition" to match the needs of the subvolumes
- # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
- del fake_partition['btrfs']
- fake_partition['encrypted'] = False
- fake_partition['generate-encryption-key-file'] = False
- # Mount destination. As of now the right hand part
- fake_partition['mountpoint'] = location
- # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
- fake_partition['subvolume'] = name
- # here we add the special mount options for the subvolume, if any.
- # if the original partition['options'] is not a list might give trouble
- if fake_partition.get('filesystem',{}).get('mount_options',[]):
- fake_partition['filesystem']['mount_options'].extend(subvol_options)
- else:
- fake_partition['filesystem']['mount_options'] = subvol_options
- # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
- # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
- # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
- fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
-
- # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
- # as "normal" ones
- mountpoints.append(fake_partition)
- except Exception as e:
- raise e
- return mountpoints
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
index d577d82b..f6d2734a 100644
--- a/archinstall/lib/disk/btrfs/btrfs_helpers.py
+++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py
@@ -1,72 +1,73 @@
-import pathlib
import logging
-from typing import Optional
+import re
+from pathlib import Path
+from typing import Optional, Dict, Any, TYPE_CHECKING
+from ...models.subvolume import Subvolume
from ...exceptions import SysCallError, DiskError
from ...general import SysCommand
from ...output import log
+from ...plugins import plugins
from ..helpers import get_mount_info
-from .btrfssubvolume import BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+if TYPE_CHECKING:
+ from .btrfspartition import BTRFSPartition
+ from ...installer import Installer
-def mount_subvolume(installation, device, name, subvolume_information):
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
- # renormalize the right hand.
- mountpoint = subvolume_information.get('mountpoint', None)
- if not mountpoint:
- return None
+class fstab_btrfs_compression_plugin():
+ def __init__(self, partition_dict):
+ self.partition_dict = partition_dict
+
+ def on_genfstab(self, installation):
+ with open(f"{installation.target}/etc/fstab", 'r') as fh:
+ fstab = fh.read()
+
+ # Replace the {installation}/etc/fstab with entries
+ # using the compress=zstd where the mountpoint has compression set.
+ with open(f"{installation.target}/etc/fstab", 'w') as fh:
+ for line in fstab.split('\n'):
+ # So first we grab the mount options by using subvol=.*? as a locator.
+ # And we also grab the mountpoint for the entry, for instance /var/log
+ if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)):
+ for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []):
+ # We then locate the correct subvolume and check if it's compressed
+ if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip():
+ # We then sneak in the compress=zstd option if it doesn't already exist:
+ # We skip entries where compression is already defined
+ if ',compress=zstd,' not in line:
+ line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}")
+ break
+
+ fh.write(f"{line}\n")
- if type(mountpoint) == str:
- mountpoint = pathlib.Path(mountpoint)
+ return True
- installation_target = installation.target
- if type(installation_target) == str:
- installation_target = pathlib.Path(installation_target)
+
+def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
+ # we normalize the subvolume name (getting rid of slash at the start if exists.
+ # In our implementation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = subvolume.name.lstrip('/')
+ mountpoint = Path(subvolume.mountpoint)
+ installation_target = Path(installation.target)
mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
mountpoint.mkdir(parents=True, exist_ok=True)
-
- mount_options = subvolume_information.get('options', [])
- if not any('subvol=' in x for x in mount_options):
- mount_options += [f'subvol={name}']
+ mount_options = subvolume.options + [f'subvol={name}']
log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
-def setup_subvolumes(installation, partition_dict):
- """
- Taken from: ..user_guides.py
-
- partition['btrfs'] = {
- "subvolumes" : {
- "@": "/",
- "@home": "/home",
- "@log": "/var/log",
- "@pkg": "/var/cache/pacman/pkg",
- "@.snapshots": "/.snapshots"
- }
- }
- """
+def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
- for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
- # renormalize the right hand.
- # mountpoint = None
- subvol_options = []
-
- match right_hand:
- # case str(): # backwards-compatability
- # mountpoint = right_hand
- case dict():
- # mountpoint = right_hand.get('mountpoint', None)
- subvol_options = right_hand.get('options', [])
+ for subvolume in partition_dict['btrfs']['subvolumes']:
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = subvolume.name.lstrip('/')
# We create the subvolume using the BTRFSPartition instance.
# That way we ensure not only easy access, but also accurate mount locations etc.
@@ -76,27 +77,28 @@ def setup_subvolumes(installation, partition_dict):
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
+ if subvolume.nodatacow:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
+
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
+ if subvolume.compress:
if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
-def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
+ if 'fstab_btrfs_compression_plugin' not in plugins:
+ plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict)
+
+
+def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
try:
- subvolume_name = None
+ subvolume_name = ''
result = {}
for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
if index == 0:
@@ -110,14 +112,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
# allows for hooking in a pre-processor to do this we have to do it here:
result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
- return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
-
+ return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
except SysCallError as error:
log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
return None
-def find_parent_subvolume(path :pathlib.Path, filters=[]):
+
+def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
# A root path cannot have a parent
if str(path) == '/':
return None
@@ -127,6 +129,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]):
if found_mount['target'] == '/':
return None
- return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
+ return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
- return subvolume \ No newline at end of file
+ return subvolume
+
+ return None
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
index 5020133d..d04c9b98 100644
--- a/archinstall/lib/disk/btrfs/btrfspartition.py
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -15,24 +15,13 @@ from .btrfs_helpers import (
if TYPE_CHECKING:
from ...installer import Installer
- from .btrfssubvolume import BtrfsSubvolume
+ from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+
class BTRFSPartition(Partition):
def __init__(self, *args, **kwargs):
Partition.__init__(self, *args, **kwargs)
- def __repr__(self, *args :str, **kwargs :str) -> str:
- mount_repr = ''
- if self.mountpoint:
- mount_repr = f", mounted={self.mountpoint}"
- elif self.target_mountpoint:
- mount_repr = f", rel_mountpoint={self.target_mountpoint}"
-
- if self._encrypted:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
- else:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
-
@property
def subvolumes(self):
for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
@@ -40,17 +29,17 @@ class BTRFSPartition(Partition):
yield subvolume_info_from_path(filesystem['target'])
def iterate_children(struct):
- for child in struct.get('children', []):
+ for c in struct.get('children', []):
if '[' in child.get('source', ''):
- yield subvolume_info_from_path(child['target'])
+ yield subvolume_info_from_path(c['target'])
- for sub_child in iterate_children(child):
+ for sub_child in iterate_children(c):
yield sub_child
for child in iterate_children(filesystem):
yield child
- def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
+ def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo':
"""
Subvolumes have to be created within a mountpoint.
This means we need to get the current installation target.
@@ -62,13 +51,13 @@ class BTRFSPartition(Partition):
if not installation:
installation = storage.get('installation_session')
- # Determain if the path given, is an absolute path or a releative path.
+ # Determain if the path given, is an absolute path or a relative path.
# We do this by checking if the path contains a known mountpoint.
if str(subvolume)[0] == '/':
if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
# Path starts with a known mountpoint which isn't /
- # Which means it's an absolut path to a mounted location.
+ # Which means it's an absolute path to a mounted location.
pass
else:
# Since it's not an absolute position with a known start.
@@ -108,9 +97,13 @@ class BTRFSPartition(Partition):
if glob.glob(str(subvolume / '*')):
raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
- elif subvolinfo := subvolume_info_from_path(subvolume):
- raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+ # Ideally we would like to check if the destination is already a subvolume.
+ # But then we would need the mount-point at this stage as well.
+ # So we'll comment out this check:
+ # elif subvolinfo := subvolume_info_from_path(subvolume):
+ # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+ # And deal with it here:
SysCommand(f"btrfs subvolume create {subvolume}")
- return subvolume_info_from_path(subvolume) \ No newline at end of file
+ return subvolume_info_from_path(subvolume)
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
index a96e2a94..5f5bdea6 100644
--- a/archinstall/lib/disk/btrfs/btrfssubvolume.py
+++ b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
@@ -16,8 +16,9 @@ from ...general import SysCommand
from ...output import log
from ...storage import storage
+
@dataclass
-class BtrfsSubvolume:
+class BtrfsSubvolumeInfo:
full_path :pathlib.Path
name :str
uuid :str
@@ -68,9 +69,9 @@ class BtrfsSubvolume:
from .btrfs_helpers import subvolume_info_from_path
# TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
- # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
+ # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
# It would also be nice if it could use findmnt(self.full_path) and traverse backwards
- # finding the last occurance of a subvolume which 'self' belongs to.
+ # finding the last occurrence of a subvolume which 'self' belongs to.
if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
return self.full_path == volume.full_path
@@ -188,4 +189,4 @@ class BtrfsSubvolume:
def unmount(self, recurse :bool = True):
SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
- log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file
+ log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index f94b4b47..5d5952a0 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -74,7 +74,7 @@ class Filesystem:
raise KeyError(f"Could not create a GPT label on {self}")
elif self.mode == MBR:
if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MSDOS label on {self}")
+ raise KeyError(f"Could not create a MS-DOS label on {self}")
self.blockdevice.flush_cache()
time.sleep(3)
@@ -100,7 +100,7 @@ class Filesystem:
partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
except DiskError:
partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)
-
+
log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
else:
log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
@@ -210,7 +210,14 @@ class Filesystem:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition:
+ def add_partition(
+ self,
+ partition_type :str,
+ start :str,
+ end :str,
+ partition_format :Optional[str] = None,
+ skip_mklabel :bool = False
+ ) -> Partition:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
@@ -221,7 +228,7 @@ class Filesystem:
raise KeyError(f"Could not create a GPT label on {self}")
elif self.mode == MBR:
if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MSDOS label on {self}")
+ raise KeyError(f"Could not create a MS-DOS label on {self}")
self.blockdevice.flush_cache()
@@ -232,6 +239,7 @@ class Filesystem:
except DiskError:
pass
+ # TODO this check should probably run in the setup process rather than during the installation
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")
@@ -245,15 +253,9 @@ class Filesystem:
if self.parted(parted_string):
for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
- self.partprobe()
-
- new_partition_uuids = []
- for partition in self.blockdevice.partitions.values():
- try:
- new_partition_uuids.append(partition.part_uuid)
- except DiskError:
- pass
+ self.blockdevice.flush_cache()
+ new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
@@ -263,17 +265,23 @@ class Filesystem:
log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
- log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
+ log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
raise err
else:
log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
- time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
+ self.partprobe()
+ time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
+ else:
+ print("Parted did not return True during partition creation")
+
+ total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
+ total_partitions.update(previous_partuuids)
# TODO: This should never be able to happen
log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
+ log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")
raise DiskError(f"Could not add partition using: {parted_string}")
def set_name(self, partition: int, name: str) -> bool:
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 99856aad..f19125f4 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -8,6 +8,8 @@ import time
import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
+from ..models.subvolume import Subvolume
+
if TYPE_CHECKING:
from .partition import Partition
@@ -112,7 +114,7 @@ def cleanup_bash_escapes(data :str) -> str:
def blkid(cmd :str) -> Dict[str, Any]:
if '-o' in cmd and '-o export' not in cmd:
- raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.")
elif '-o' not in cmd:
cmd += ' -o export'
@@ -133,7 +135,7 @@ def blkid(cmd :str) -> Dict[str, Any]:
key, val = line.split('=', 1)
if key.lower() == 'devname':
devname = val
- # Lowercase for backwards compatability with all_disks() previous use cases
+ # Lowercase for backwards compatibility with all_disks() previous use cases
result[devname] = {
"path": devname,
"PATH": devname
@@ -218,7 +220,12 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
# we'll iterate the /sys/class definitions and find the information
# from there.
for block_device in glob.glob("/sys/class/block/*"):
- device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
+
+ if device_path.exists() is False:
+ log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
+ continue
+
try:
information = blkid(f'blkid -p -o export {device_path}')
except SysCallError as ex:
@@ -227,12 +234,17 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
try:
information = get_loop_info(device_path)
if not information:
+ print("Exit code for blkid -p -o export was:", ex.exit_code)
raise SysCallError("Could not get loop information", exit_code=1)
except SysCallError:
+ print("Not a loop device, trying uevent rules.")
information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
else:
+ # We could not reliably get any information, perhaps the disk is clean of information?
+ print("Raising ex because:", ex.exit_code)
raise ex
+ # return instances
information = enrich_blockdevice_information(information)
@@ -244,7 +256,7 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path))))
elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
instances[path] = BlockDevice(path, path_info)
- elif path_info.get('TYPE') == 'squashfs':
+ elif path_info.get('TYPE') in ('squashfs', 'erofs'):
# We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
continue
else:
@@ -368,7 +380,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict
return filters
-def get_partitions_in_use(mountpoint :str) -> List[Partition]:
+def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
from .partition import Partition
try:
@@ -391,12 +403,20 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]:
if not type(blockdev) in (Partition, MapperDev):
continue
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
+ if isinstance(blockdev, Partition):
+ for blockdev_mountpoint in blockdev.mountpoints:
+ block_devices_mountpoints[blockdev_mountpoint] = blockdev
+ else:
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ # Since all_blockdevices() returns PosixPath objects, we need to convert
+ # findmnt paths to pathlib.Path() first:
+ mountpoint = pathlib.Path(mountpoint)
+
if mountpoint in block_devices_mountpoints:
if mountpoint not in mounts:
mounts[mountpoint] = block_devices_mountpoints[mountpoint]
@@ -433,9 +453,10 @@ def disk_layouts() -> Optional[Dict[str, Any]]:
def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool:
- for partition in blockdevices.values():
- if partition.get('encrypted', False):
- yield partition
+ for blockdevice in blockdevices.values():
+ for partition in blockdevice.get('partitions', []):
+ if partition.get('encrypted', False):
+ yield partition
def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
for device in block_devices:
@@ -468,13 +489,14 @@ def convert_device_to_uuid(path :str) -> str:
raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")
+
def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool:
""" Determine if a certain partition is mounted (or has a mountpoint) as specific target (path)
Coded for clarity rather than performance
Input parms:
:parm partition the partition we check
- :type Either a Partition object or a dict with the contents of a partition definiton in the disk_layouts schema
+ :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema
:parm target (a string representing a mount path we want to check for.
:type str
@@ -484,10 +506,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri
"""
# we create the mountpoint list
if isinstance(partition,dict):
- subvols = partition.get('btrfs',{}).get('subvolumes',{})
- mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols]
+ subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', [])
+ mountpoints = [partition.get('mountpoint')]
+ mountpoints += [volume.mountpoint for volume in subvolumes]
else:
mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes]
+
# we check
if strict or target == '/':
if target in mountpoints:
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
index 913dbc13..71ef2a79 100644
--- a/archinstall/lib/disk/mapperdev.py
+++ b/archinstall/lib/disk/mapperdev.py
@@ -10,7 +10,7 @@ from ..general import SysCommand
from ..output import log
if TYPE_CHECKING:
- from .btrfs import BtrfsSubvolume
+ from .btrfs import BtrfsSubvolumeInfo
@dataclass
class MapperDev:
@@ -37,12 +37,12 @@ class MapperDev:
for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
-
+
try:
uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
except SysCallError as error:
log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
-
+
information = uevent(uevent_data)
block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
@@ -65,9 +65,13 @@ class MapperDev:
return None
@property
+ def mountpoints(self) -> List[Dict[str, Any]]:
+ return [obj['target'] for obj in self.mount_information]
+
+ @property
def mount_information(self) -> List[Dict[str, Any]]:
from .helpers import find_mountpoint
- return list(find_mountpoint(self.path))
+ return [{**obj, 'target' : pathlib.Path(obj.get('target', '/dev/null'))} for obj in find_mountpoint(self.path)]
@property
def filesystem(self) -> Optional[str]:
@@ -75,10 +79,10 @@ class MapperDev:
return get_filesystem_type(self.path)
@property
- def subvolumes(self) -> Iterator['BtrfsSubvolume']:
+ def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']:
from .btrfs import subvolume_info_from_path
for mountpoint in self.mount_information:
if target := mountpoint.get('target'):
if subvolume := subvolume_info_from_path(pathlib.Path(target)):
- yield subvolume \ No newline at end of file
+ yield subvolume
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 73c88597..56a7d436 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -1,60 +1,83 @@
import glob
-import pathlib
import time
import logging
import json
import os
import hashlib
+import typing
+from dataclasses import dataclass
+from pathlib import Path
from typing import Optional, Dict, Any, List, Union, Iterator
from .blockdevice import BlockDevice
-from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name
+from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
from .btrfs.btrfs_helpers import subvolume_info_from_path
-from .btrfs.btrfssubvolume import BtrfsSubvolume
+from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
+
+
+@dataclass
+class PartitionInfo:
+ pttype: str
+ partuuid: str
+ uuid: str
+ start: Optional[int]
+ end: Optional[int]
+ bootable: bool
+ size: float
+ sector_size: int
+ filesystem_type: str
+ mountpoints: List[Path]
+
+ def get_first_mountpoint(self) -> Optional[Path]:
+ if len(self.mountpoints) > 0:
+ return self.mountpoints[0]
+ return None
+
class Partition:
- def __init__(self,
+ def __init__(
+ self,
path: str,
block_device: BlockDevice,
part_id :Optional[str] = None,
filesystem :Optional[str] = None,
mountpoint :Optional[str] = None,
encrypted :bool = False,
- autodetect_filesystem :bool = True):
-
+ autodetect_filesystem :bool = True,
+ ):
if not part_id:
part_id = os.path.basename(path)
- self.block_device = block_device
- if type(self.block_device) is str:
+ if type(block_device) is str:
raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
- self.path = path
- self.part_id = part_id
- self.target_mountpoint = mountpoint
- self.filesystem = filesystem
+ self.block_device = block_device
+ self._path = path
+ self._part_id = part_id
+ self._target_mountpoint = mountpoint
self._encrypted = None
- self.encrypted = encrypted
- self.allow_formatting = False
+ self._encrypted = encrypted
+ self._wipe = False
+ self._type = 'primary'
if mountpoint:
self.mount(mountpoint)
- try:
- self.mount_information = list(find_mountpoint(self.path))
- except DiskError:
- self.mount_information = [{}]
+ self._partition_info = self._fetch_information()
- if not self.filesystem and autodetect_filesystem:
- self.filesystem = get_filesystem_type(path)
+ if not autodetect_filesystem and filesystem:
+ self._partition_info.filesystem_type = filesystem
- if self.filesystem == 'crypto_LUKS':
- self.encrypted = True
+ if self._partition_info.filesystem_type == 'crypto_LUKS':
+ self._encrypted = True
+ # I hate doint this but I'm currently unsure where this
+ # is acutally used to be able to fix the typing issues properly
+ @typing.no_type_check
def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
@@ -62,147 +85,175 @@ class Partition:
left_comparitor = str(left_comparitor)
# The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
- return self.path < left_comparitor
+ return self._path < left_comparitor
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
- if self.mountpoint:
- mount_repr = f", mounted={self.mountpoint}"
- elif self.target_mountpoint:
- mount_repr = f", rel_mountpoint={self.target_mountpoint}"
+ if mountpoint := self._partition_info.get_first_mountpoint():
+ mount_repr = f", mounted={mountpoint}"
+ elif self._target_mountpoint:
+ mount_repr = f", rel_mountpoint={self._target_mountpoint}"
+
+ classname = self.__class__.__name__
if self._encrypted:
- return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
+ return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
else:
- return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
+ return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
+
+ def as_json(self) -> Dict[str, Any]:
+ """
+ this is used for the table representation of the partition (see FormattedOutput)
+ """
+ partition_info = {
+ 'type': self._type,
+ 'PARTUUID': self.part_uuid,
+ 'wipe': self._wipe,
+ 'boot': self.boot,
+ 'ESP': self.boot,
+ 'mountpoint': self._target_mountpoint,
+ 'encrypted': self._encrypted,
+ 'start': self.start,
+ 'size': self.end,
+ 'filesystem': self._partition_info.filesystem_type
+ }
+
+ return partition_info
def __dump__(self) -> Dict[str, Any]:
+ # TODO remove this in favour of as_json
return {
- 'type': 'primary',
- 'PARTUUID': self._safe_uuid,
- 'wipe': self.allow_formatting,
+ 'type': self._type,
+ 'PARTUUID': self.part_uuid,
+ 'wipe': self._wipe,
'boot': self.boot,
'ESP': self.boot,
- 'mountpoint': self.target_mountpoint,
+ 'mountpoint': self._target_mountpoint,
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
'filesystem': {
- 'format': get_filesystem_type(self.path)
+ 'format': self._partition_info.filesystem_type
}
}
- @property
- def mountpoint(self) -> Optional[str]:
- try:
- data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
- for filesystem in data['filesystems']:
- return pathlib.Path(filesystem.get('target'))
+ def _call_lsblk(self) -> Dict[str, Any]:
+ self.partprobe()
+ # This sleep might be overkill, but lsblk is known to
+ # work against a chaotic cache that can change during call
+ # causing no information to be returned (blkid is better)
+ # time.sleep(1)
+ # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
+
+ try:
+ output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
except SysCallError as error:
- # Not mounted anywhere most likely
- log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey")
- pass
+ # It appears as if lsblk can return exit codes like 8192 to indicate something.
+ # But it does return output so we'll try to catch it.
+ output = error.worker.decode('UTF-8')
- return None
+ if output:
+ lsblk_info = json.loads(output)
+ return lsblk_info
- @property
- def sector_size(self) -> Optional[int]:
- output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
+ raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk')
- for device in output['blockdevices']:
- return device.get('log-sec', None)
+ def _call_sfdisk(self) -> Dict[str, Any]:
+ output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
- @property
- def start(self) -> Optional[str]:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ if output:
+ sfdisk_info = json.loads(output)
+ partitions = sfdisk_info.get('partitiontable', {}).get('partitions', [])
+ node = list(filter(lambda x: x['node'] == self._path, partitions))
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['start'] # * self.sector_size
+ if len(node) > 0:
+ return node[0]
- @property
- def end(self) -> Optional[str]:
- # TODO: actually this is size in sectors unit
- # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ return {}
+
+ raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk')
+
+ def _fetch_information(self) -> PartitionInfo:
+ lsblk_info = self._call_lsblk()
+ sfdisk_info = self._call_sfdisk()
+
+ if not (device := lsblk_info.get('blockdevices', [None])[0]):
+ raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['size'] # * self.sector_size
+ mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
+ bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
+
+ return PartitionInfo(
+ pttype=device['pttype'],
+ partuuid=device['partuuid'],
+ uuid=device['uuid'],
+ sector_size=device['log-sec'],
+ size=convert_size_to_gb(device['size']),
+ start=sfdisk_info.get('start', None),
+ end=sfdisk_info.get('size', None),
+ bootable=bootable,
+ filesystem_type=device['fstype'],
+ mountpoints=mountpoints
+ )
@property
- def end_sectors(self) -> Optional[str]:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ def target_mountpoint(self) -> Optional[str]:
+ return self._target_mountpoint
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['start'] + partition['size']
+ @property
+ def path(self) -> str:
+ return self._path
@property
- def size(self) -> Optional[float]:
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- self.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
+ def filesystem(self) -> str:
+ return self._partition_info.filesystem_type
- try:
- lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode())
+ @property
+ def mountpoint(self) -> Optional[Path]:
+ if len(self.mountpoints) > 0:
+ return self.mountpoints[0]
+ return None
- for device in lsblk['blockdevices']:
- return convert_size_to_gb(device['size'])
- except SysCallError as error:
- if error.exit_code == 8192:
- return None
- else:
- raise error
+ @property
+ def mountpoints(self) -> List[Path]:
+ return self._partition_info.mountpoints
@property
- def boot(self) -> bool:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
-
- # Get the bootable flag from the sfdisk output:
- # {
- # "partitiontable": {
- # "device":"/dev/loop0",
- # "partitions": [
- # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true}
- # ]
- # }
- # }
-
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- # first condition is for MBR disks, second for GPT disks
- return partition.get('bootable', False) or partition.get('type','') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
+ def sector_size(self) -> int:
+ return self._partition_info.sector_size
- return False
+ @property
+ def start(self) -> Optional[int]:
+ return self._partition_info.start
@property
- def partition_type(self) -> Optional[str]:
- lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
+ def end(self) -> Optional[int]:
+ return self._partition_info.end
- for device in lsblk['blockdevices']:
- return device['pttype']
+ @property
+ def end_sectors(self) -> Optional[int]:
+ start = self._partition_info.start
+ end = self._partition_info.end
+ if start and end:
+ return start + end
+ return None
@property
- def part_uuid(self) -> Optional[str]:
- """
- Returns the PARTUUID as returned by lsblk.
- This is more reliable than relying on /dev/disk/by-partuuid as
- it doesn't seam to be able to detect md raid partitions.
- For bind mounts all the subvolumes share the same uuid
- """
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- if not self.partprobe():
- raise DiskError(f"Could not perform partprobe on {self.device_path}")
-
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
+ def size(self) -> Optional[float]:
+ return self._partition_info.size
- partuuid = self._safe_part_uuid
- if partuuid:
- return partuuid
+ @property
+ def boot(self) -> bool:
+ return self._partition_info.bootable
- raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
+ @property
+ def partition_type(self) -> Optional[str]:
+ return self._partition_info.pttype
+
+ @property
+ def part_uuid(self) -> str:
+ return self._partition_info.partuuid
@property
def uuid(self) -> Optional[str]:
@@ -232,7 +283,7 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
if not self.partprobe():
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
return None
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
@@ -240,7 +291,7 @@ class Partition:
try:
return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
except SysCallError as error:
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
return None
@@ -254,7 +305,7 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
if not self.partprobe():
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
return None
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
@@ -262,37 +313,39 @@ class Partition:
try:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
except SysCallError as error:
- if self.block_device.info.get('TYPE') == 'iso9660':
+ if self.block_device.partition_type == 'iso9660':
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
return None
log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
+ return self._partition_info.uuid
+
@property
def encrypted(self) -> Union[bool, None]:
return self._encrypted
- @encrypted.setter
- def encrypted(self, value: bool) -> None:
- self._encrypted = value
-
@property
def parent(self) -> str:
return self.real_device
@property
def real_device(self) -> str:
- for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
- if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
- return f"/dev/{parent}"
- # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
- return self.path
+ output = SysCommand('lsblk -J').decode('UTF-8')
+
+ if output:
+ for blockdevice in json.loads(output)['blockdevices']:
+ if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
+ return f"/dev/{parent}"
+ return self._path
+
+ raise DiskError('Unable to get disk information for command "lsblk -J"')
@property
def device_path(self) -> str:
- """ for bind mounts returns the phisical path of the partition
+ """ for bind mounts returns the physical path of the partition
"""
- device_path, bind_name = split_bind_name(self.path)
+ device_path, bind_name = split_bind_name(self._path)
return device_path
@property
@@ -300,38 +353,40 @@ class Partition:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
- device_path, bind_name = split_bind_name(self.path)
+ device_path, bind_name = split_bind_name(self._path)
return bind_name
@property
- def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]:
from .helpers import findmnt
-
+
def iterate_children_recursively(information):
for child in information.get('children', []):
if target := child.get('target'):
- if subvolume := subvolume_info_from_path(pathlib.Path(target)):
- yield subvolume
+ if child.get('fstype') == 'btrfs':
+ if subvolume := subvolume_info_from_path(Path(target)):
+ yield subvolume
if child.get('children'):
for subchild in iterate_children_recursively(child):
yield subchild
- for mountpoint in self.mount_information:
- if result := findmnt(pathlib.Path(mountpoint['target'])):
- for filesystem in result.get('filesystems', []):
- if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])):
- yield subvolume
+ if self._partition_info.filesystem_type == 'btrfs':
+ for mountpoint in self._partition_info.mountpoints:
+ if result := findmnt(mountpoint):
+ for filesystem in result.get('filesystems', []):
+ if subvolume := subvolume_info_from_path(mountpoint):
+ yield subvolume
- for child in iterate_children_recursively(filesystem):
- yield child
+ for child in iterate_children_recursively(filesystem):
+ yield child
def partprobe(self) -> bool:
try:
if self.block_device:
return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code
except SysCallError as error:
- log(f"Unreliable results might be given for {self.path} due to partprobe error: {error}", level=logging.DEBUG)
+ log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG)
return False
@@ -343,19 +398,20 @@ class Partition:
with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device:
return unlocked_device.filesystem
except SysCallError:
- return None
+ pass
+ return None
def has_content(self) -> bool:
- fs_type = get_filesystem_type(self.path)
+ fs_type = self._partition_info.filesystem_type
if not fs_type or "swap" in fs_type:
return False
temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest()
- temporary_path = pathlib.Path(temporary_mountpoint)
+ temporary_path = Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True)
- if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
- raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
+ if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0:
+ raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}')
files = len(glob.glob(f"{temporary_mountpoint}/*"))
iterations = 0
@@ -366,14 +422,14 @@ class Partition:
return True if files > 0 else False
- def encrypt(self, *args :str, **kwargs :str) -> str:
+ def encrypt(self, password: Optional[str] = None) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
from ..luks import luks2
handle = luks2(self, None, None)
- return handle.encrypt(self, *args, **kwargs)
+ return handle.encrypt(self, password=password)
def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
"""
@@ -381,17 +437,17 @@ class Partition:
the formatting functionality and in essence the support for the given filesystem.
"""
if filesystem is None:
- filesystem = self.filesystem
+ filesystem = self._partition_info.filesystem_type
if path is None:
- path = self.path
+ path = self._path
# This converts from fat32 -> vfat to unify filesystem names
filesystem = get_mount_fs_type(filesystem)
# To avoid "unable to open /dev/x: No such file or directory"
start_wait = time.time()
- while pathlib.Path(path).exists() is False and time.time() - start_wait < 10:
+ while Path(path).exists() is False and time.time() - start_wait < 10:
time.sleep(0.025)
if log_formatting:
@@ -401,57 +457,57 @@ class Partition:
if filesystem == 'btrfs':
options = ['-f'] + options
- if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
+ mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')
+ if mkfs and 'UUID:' not in mkfs:
raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'vfat':
options = ['-F32'] + options
-
+ log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")
if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext4':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext2':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'ext2'
-
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self._partition_info.filesystem_type = 'ext2'
elif filesystem == 'xfs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'f2fs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ntfs3':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'crypto_LUKS':
# from ..luks import luks2
# encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path)
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
@@ -460,13 +516,13 @@ class Partition:
if retry is True:
log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
time.sleep(storage.get('DISK_TIMEOUTS', 1))
-
+
return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
- self.encrypted = True
+ self._encrypted = True
else:
- self.encrypted = False
+ self._encrypted = False
return True
@@ -478,18 +534,18 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
+ return None
+
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
- if not self.mountpoint:
+ if not self._partition_info.get_first_mountpoint():
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
- if not self.filesystem:
- raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
- fs = self.filesystem
+ fs = self._partition_info.filesystem_type
fs_type = get_mount_fs_type(fs)
- pathlib.Path(target).mkdir(parents=True, exist_ok=True)
+ Path(target).mkdir(parents=True, exist_ok=True)
if self.bind_name:
device_path = self.device_path
@@ -499,7 +555,7 @@ class Partition:
else:
options = f"subvol={self.bind_name}"
else:
- device_path = self.path
+ device_path = self._path
try:
if options:
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}")
@@ -508,7 +564,7 @@ class Partition:
# TODO: Should be redundant to check for exit_code
if mnt_handle.exit_code != 0:
- raise DiskError(f"Could not mount {self.path} to {target} using options {options}")
+ raise DiskError(f"Could not mount {self._path} to {target} using options {options}")
except SysCallError as err:
raise err
@@ -517,19 +573,17 @@ class Partition:
return False
def unmount(self) -> bool:
- worker = SysCommand(f"/usr/bin/umount {self.path}")
+ worker = SysCommand(f"/usr/bin/umount {self._path}")
+ exit_code = worker.exit_code
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
- if 0 < worker.exit_code < 8000:
- raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
+ if exit_code and 0 < exit_code < 8000:
+ raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
return True
- def umount(self) -> bool:
- return self.unmount()
-
def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
@@ -538,7 +592,7 @@ class Partition:
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
"""
try:
- self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
+ self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False)
except (SysCallError, DiskError):
pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err:
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 5fa6bfdc..5809c073 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -3,6 +3,8 @@ import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
+from ..models.subvolume import Subvolume
+
if TYPE_CHECKING:
from .blockdevice import BlockDevice
_: Any
@@ -107,17 +109,14 @@ def suggest_single_disk_layout(block_device :BlockDevice,
# https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash
# https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh
layout[block_device.path]['partitions'][1]['btrfs'] = {
- "subvolumes" : {
- "@":"/",
- "@home": "/home",
- "@log": "/var/log",
- "@pkg": "/var/cache/pacman/pkg",
- "@.snapshots": "/.snapshots"
- }
+ 'subvolumes': [
+ Subvolume('@', '/'),
+ Subvolume('@home', '/home'),
+ Subvolume('@log', '/var/log'),
+ Subvolume('@pkg', '/var/cache/pacman/pkg'),
+ Subvolume('@.snapshots', '/.snapshots')
+ ]
}
- # else:
- # pass # ... implement a guided setup
-
elif using_home_partition:
# If we don't want to use subvolumes,
# But we want to be able to re-use data between re-installs..
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
index fd1b7f33..54808886 100644
--- a/archinstall/lib/disk/validators.py
+++ b/archinstall/lib/disk/validators.py
@@ -7,13 +7,11 @@ def valid_parted_position(pos :str) -> bool:
if pos.isdigit():
return True
- if pos[-1] == '%' and pos[:-1].isdigit():
+ if pos.lower().endswith('b') and pos[:-1].isdigit():
return True
- if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].replace(".", "", 1).isdigit():
- return True
-
- if pos[-2:].lower() in ['kb', 'mb', 'gb', 'tb'] and pos[:-2].replace(".", "", 1).isdigit():
+ if any(pos.lower().endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit()
+ for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']):
return True
return False
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index a16faa3f..a66e4e04 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -1,4 +1,7 @@
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .general import SysCommandWorker
class RequirementError(BaseException):
pass
@@ -17,10 +20,11 @@ class ProfileError(BaseException):
class SysCallError(BaseException):
- def __init__(self, message :str, exit_code :Optional[int] = None) -> None:
+ def __init__(self, message :str, exit_code :Optional[int] = None, worker :Optional['SysCommandWorker'] = None) -> None:
super(SysCallError, self).__init__(message)
self.message = message
self.exit_code = exit_code
+ self.worker = worker
class PermissionError(BaseException):
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index b99e4a45..d76b7036 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -6,12 +6,14 @@ import os
import secrets
import shlex
import subprocess
+import stat
import string
import sys
import time
import re
import urllib.parse
import urllib.request
+import urllib.error
import pathlib
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
@@ -37,7 +39,7 @@ else:
def unregister(self, fileno :int, *args :List[Any], **kwargs :Dict[str, Any]) -> None:
try:
- del(self.monitoring[fileno])
+ del(self.monitoring[fileno]) # noqa: E275
except:
pass
@@ -207,7 +209,7 @@ class SysCommandWorker:
self.cmd = cmd
self.callbacks = callbacks
self.peak_output = peak_output
- # define the standard locale for command outputs. For now the C ascii one. Can be overriden
+ # define the standard locale for command outputs. For now the C ascii one. Can be overridden
self.environment_vars = {**storage.get('CMD_LOCALE',{}),**environment_vars}
self.logfile = logfile
self.working_directory = working_directory
@@ -270,7 +272,7 @@ class SysCommandWorker:
log(args[1], level=logging.DEBUG, fg='red')
if self.exit_code != 0:
- raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code)
+ raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code, worker=self)
def is_alive(self) -> bool:
self.poll()
@@ -312,9 +314,18 @@ class SysCommandWorker:
except UnicodeDecodeError:
return False
- with open(f"{storage['LOG_PATH']}/cmd_output.txt", "a") as peak_output_log:
+ peak_logfile = pathlib.Path(f"{storage['LOG_PATH']}/cmd_output.txt")
+
+ change_perm = False
+ if peak_logfile.exists() is False:
+ change_perm = True
+
+ with peak_logfile.open("a") as peak_output_log:
peak_output_log.write(output)
+ if change_perm:
+ os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
+
sys.stdout.write(str(output))
sys.stdout.flush()
@@ -354,16 +365,24 @@ class SysCommandWorker:
# Note: If for any reason, we get a Python exception between here
# and until os.close(), the traceback will get locked inside
# stdout of the child_fd object. `os.read(self.child_fd, 8192)` is the
- # only way to get the traceback without loosing it.
+ # only way to get the traceback without losing it.
self.pid, self.child_fd = pty.fork()
# https://stackoverflow.com/questions/4022600/python-pty-fork-how-does-it-work
if not self.pid:
+ history_logfile = pathlib.Path(f"{storage['LOG_PATH']}/cmd_history.txt")
try:
+ change_perm = False
+ if history_logfile.exists() is False:
+ change_perm = True
+
try:
- with open(f"{storage['LOG_PATH']}/cmd_history.txt", "a") as cmd_log:
+ with history_logfile.open("a") as cmd_log:
cmd_log.write(f"{self.cmd}\n")
+
+ if change_perm:
+ os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
except PermissionError:
pass
@@ -443,7 +462,7 @@ class SysCommand:
def __repr__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> str:
if self.session:
- return self.session._trace_log.decode('UTF-8')
+ return self.session._trace_log.decode('UTF-8', errors='backslashreplace')
return ''
def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]:
@@ -547,9 +566,13 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target
parsed_url = urllib.parse.urlparse(stream)
- if parsed_url.scheme: # The stream is in fact a URL that should be grabed
- with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
- target.update(json.loads(response.read()))
+ if parsed_url.scheme: # The stream is in fact a URL that should be grabbed
+ try:
+ with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
+ target.update(json.loads(response.read()))
+ except urllib.error.HTTPError as error:
+ log(f"Could not load {configuration_identifier} via {parsed_url} due to: {error}", level=logging.ERROR, fg="red")
+ return False
else:
if pathlib.Path(stream).exists():
try:
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index b2cd6306..1270959e 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -24,6 +24,7 @@ from .disk.partition import get_mount_fs_type
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
from .hsm import fido2_enroll
from .models.users import User
+from .models.subvolume import Subvolume
if TYPE_CHECKING:
_: Any
@@ -158,8 +159,6 @@ class Installer:
print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues"))
raise args[1]
- self.genfstab()
-
if not (missing_steps := self.post_install_check()):
self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO)
self.sync_log_to_install_medium()
@@ -195,7 +194,7 @@ class Installer:
return True
def _create_keyfile(self,luks_handle , partition :dict, password :str):
- """ roiutine to create keyfiles, so it can be moved elsewere
+ """ roiutine to create keyfiles, so it can be moved elsewhere
"""
if partition.get('generate-encryption-key-file'):
if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
@@ -220,7 +219,7 @@ class Installer:
"""
if partition.get("mountpoint") is None:
if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})):
- for mountpoint in [sub_list[subvolume] if isinstance(sub_list[subvolume],str) else sub_list[subvolume].get("mountpoint") for subvolume in sub_list if sub_list[subvolume]]:
+ for mountpoint in [sub_list[subvolume].get("mountpoint") if isinstance(subvolume, dict) else subvolume.mountpoint for subvolume in sub_list]:
if mountpoint == '/':
return True
return False
@@ -247,16 +246,17 @@ class Installer:
# we manage the encrypted partititons
for partition in [entry for entry in list_part if entry.get('encrypted', False)]:
# open the luks device and all associate stuff
- if not (password := partition.get('!password', None)):
- raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}")
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
- else:
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
+ if not (password := partition.get('!password', None)) and storage['arguments'].get('!encryption-password'):
+ password = storage['arguments'].get('!encryption-password')
+ elif not password:
+ raise RequirementError(f"Missing partition encryption password in layout: {partition}")
+
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
# note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device:
- if partition.get('generate-encryption-key-file',False) and not self._has_root(partition):
- list_luks_handles.append([luks_handle,partition,password])
+ if partition.get('generate-encryption-key-file', False) and not self._has_root(partition):
+ list_luks_handles.append([luks_handle, partition, password])
# this way all the requesrs will be to the dm_crypt device and not to the physical partition
partition['device_instance'] = unlocked_device
@@ -265,47 +265,25 @@ class Installer:
hsm_device_path = storage['arguments']['HSM']
fido2_enroll(hsm_device_path, partition['device_instance'], password)
- # we manage the btrfs partitions
- if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
- for partition in btrfs_subvolumes:
- if mount_options := ','.join(partition.get('filesystem',{}).get('mount_options',[])):
- self.mount(partition['device_instance'], "/", options=mount_options)
- else:
- self.mount(partition['device_instance'], "/")
-
- setup_subvolumes(
- installation=self,
- partition_dict=partition
- )
+ btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])]
- partition['device_instance'].unmount()
+ for partition in btrfs_subvolumes:
+ device_instance = partition['device_instance']
+ mount_options = partition.get('filesystem', {}).get('mount_options', [])
+ self.mount(device_instance, "/", options=','.join(mount_options))
+ setup_subvolumes(installation=self, partition_dict=partition)
+ device_instance.unmount()
# We then handle any special cases, such as btrfs
- if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
- for partition_information in btrfs_subvolumes:
- for name, mountpoint in sorted(partition_information['btrfs']['subvolumes'].items(), key=lambda item: item[1]):
- btrfs_subvolume_information = {}
-
- match mountpoint:
- case str(): # backwards-compatability
- btrfs_subvolume_information['mountpoint'] = mountpoint
- btrfs_subvolume_information['options'] = []
- case dict():
- btrfs_subvolume_information['mountpoint'] = mountpoint.get('mountpoint', None)
- btrfs_subvolume_information['options'] = mountpoint.get('options', [])
- case _:
- continue
-
- if mountpoint_parsed := btrfs_subvolume_information.get('mountpoint'):
- # We cache the mount call for later
- mount_queue[mountpoint_parsed] = lambda device=partition_information['device_instance'], \
- name=name, \
- subvolume_information=btrfs_subvolume_information: mount_subvolume(
- installation=self,
- device=device,
- name=name,
- subvolume_information=subvolume_information
- )
+ for partition in btrfs_subvolumes:
+ subvolumes: List[Subvolume] = partition['btrfs']['subvolumes']
+ for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint):
+ # We cache the mount call for later
+ mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume(
+ installation=self,
+ device=device,
+ subvolume=sub_vol
+ )
# We mount ordinary partitions, and we sort them by the mountpoint
for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']):
@@ -348,7 +326,7 @@ class Installer:
def enable_multilib_repository(self):
# Set up a regular expression pattern of a commented line containing 'multilib' within []
- pattern = re.compile("^#\\[.*multilib.*\\]$")
+ pattern = re.compile(r"^#\s*\[multilib\]$")
# This is used to track if the previous line is a match, so we end up uncommenting the line after the block.
matched = False
@@ -413,7 +391,7 @@ class Installer:
try:
run_pacman('-Syy', default_cmd='/usr/bin/pacman')
except SysCallError as error:
- self.log(f'Could not sync a new package databse: {error}', level=logging.ERROR, fg="red")
+ self.log(f'Could not sync a new package database: {error}', level=logging.ERROR, fg="red")
if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
@@ -454,7 +432,8 @@ class Installer:
for plugin in plugins.values():
if hasattr(plugin, 'on_genfstab'):
- plugin.on_genfstab(self)
+ if plugin.on_genfstab(self) is True:
+ break
return True
@@ -466,10 +445,27 @@ class Installer:
if not len(locale):
return True
+ modifier = ''
+
+ # This is a temporary patch to fix #1200
+ if '.' in locale:
+ locale, potential_encoding = locale.split('.', 1)
+
+ # Override encoding if encoding is set to the default parameter
+ # and the "found" encoding differs.
+ if encoding == 'UTF-8' and encoding != potential_encoding:
+ encoding = potential_encoding
+
+ # Make sure we extract the modifier, that way we can put it in if needed.
+ if '@' in locale:
+ locale, modifier = locale.split('@', 1)
+ modifier = f"@{modifier}"
+ # - End patch
+
with open(f'{self.target}/etc/locale.gen', 'a') as fh:
- fh.write(f'{locale}.{encoding} {encoding}\n')
+ fh.write(f'{locale}.{encoding}{modifier} {encoding}\n')
with open(f'{self.target}/etc/locale.conf', 'w') as fh:
- fh.write(f'LANG={locale}.{encoding}\n')
+ fh.write(f'LANG={locale}.{encoding}{modifier}\n')
return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
@@ -654,7 +650,7 @@ class Installer:
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
- if not storage['arguments']['HSM']:
+ if not storage['arguments'].get('HSM'):
# For now, if we don't use HSM we revert to the old
# way of setting up encryption hooks for mkinitcpio.
# This is purely for stability reasons, we're going away from this.
@@ -696,7 +692,7 @@ class Installer:
self.HOOKS.remove('fsck')
if self.detect_encryption(partition):
- if storage['arguments']['HSM']:
+ if storage['arguments'].get('HSM'):
# Required bby mkinitcpio to add support for fido2-device options
self.pacstrap('libfido2')
@@ -728,7 +724,7 @@ class Installer:
self.log("The multilib flag is set. This system will be installed with the multilib repository enabled.")
self.enable_multilib_repository()
else:
- self.log("The testing flag is not set. This system will be installed without testing repositories enabled.")
+ self.log("The multilib flag is not set. This system will be installed without multilib repositories enabled.")
if testing:
self.log("The testing flag is set. This system will be installed with testing repositories enabled.")
@@ -760,7 +756,7 @@ class Installer:
# TODO: Use python functions for this
SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
- if storage['arguments']['HSM']:
+ if storage['arguments'].get('HSM'):
# TODO:
# A bit of a hack, but we need to get vconsole.conf in there
# before running `mkinitcpio` because it expects it in HSM mode.
@@ -872,7 +868,7 @@ class Installer:
options_entry = f'rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n'
for subvolume in root_partition.subvolumes:
- if subvolume.root is True:
+ if subvolume.root is True and subvolume.name != '<FS_TREE>':
options_entry = f"rootflags=subvol={subvolume.name} " + options_entry
# Zswap should be disabled when using zram.
@@ -888,7 +884,7 @@ class Installer:
kernel_options = f"options"
- if storage['arguments']['HSM']:
+ if storage['arguments'].get('HSM'):
# Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work
kernel_options += f" rd.luks.name={real_device.uuid}=luksdev"
# Note: tpm2-device and fido2-device don't play along very well:
@@ -1022,10 +1018,9 @@ class Installer:
boot_partition = None
root_partition = None
for partition in self.partitions:
- print(partition, [partition.mountpoint], [self.target])
- if partition.mountpoint == self.target / 'boot':
+ if self.target / 'boot' in partition.mountpoints:
boot_partition = partition
- elif partition.mountpoint == self.target:
+ elif self.target in partition.mountpoints:
root_partition = partition
if boot_partition is None or root_partition is None:
@@ -1154,7 +1149,8 @@ class Installer:
return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0
def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
- return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0
+ cleaned_path = path.replace('\'', '\\\'')
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'").exit_code == 0
def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
return InstallationFile(self, filename, owner)
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
index b48c3bc4..5580fa91 100644
--- a/archinstall/lib/locale_helpers.py
+++ b/archinstall/lib/locale_helpers.py
@@ -20,7 +20,7 @@ def list_locales() -> List[str]:
entries.reverse()
for entry in entries:
- text = entry[1:].strip()
+ text = entry.replace('#', '').strip()
if text == '':
break
locales.append(text)
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index ac480b11..7e4534d8 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -22,9 +22,9 @@ from .disk.btrfs import BTRFSPartition
class luks2:
def __init__(self,
- partition :Partition,
- mountpoint :str,
- password :str,
+ partition: Partition,
+ mountpoint: Optional[str],
+ password: Optional[str],
key_file :Optional[str] = None,
auto_unmount :bool = False,
*args :str,
diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py
index 5cb27cab..d1bec189 100644
--- a/archinstall/lib/menu/global_menu.py
+++ b/archinstall/lib/menu/global_menu.py
@@ -3,38 +3,37 @@ from __future__ import annotations
from typing import Any, List, Optional, Union, Dict, TYPE_CHECKING
import archinstall
-
-from ..menu import Menu
-from ..menu.selection_menu import Selector, GeneralMenu
+from ..disk import encrypted_partitions
from ..general import SysCommand, secret
from ..hardware import has_uefi
+from ..menu import Menu
+from ..menu.selection_menu import Selector, GeneralMenu
from ..models import NetworkConfiguration
-from ..storage import storage
+from ..models.users import User
+from ..output import FormattedOutput
from ..profiles import is_desktop_profile, Profile
-from ..disk import encrypted_partitions
-
-from ..user_interaction import get_password, ask_for_a_timezone, save_config
-from ..user_interaction import ask_ntp
-from ..user_interaction import ask_for_swap
+from ..storage import storage
+from ..user_interaction import add_number_of_parrallel_downloads
+from ..user_interaction import ask_additional_packages_to_install
+from ..user_interaction import ask_for_additional_users
+from ..user_interaction import ask_for_audio_selection
from ..user_interaction import ask_for_bootloader
+from ..user_interaction import ask_for_swap
from ..user_interaction import ask_hostname
-from ..user_interaction import ask_for_audio_selection
-from ..user_interaction import ask_additional_packages_to_install
+from ..user_interaction import ask_ntp
from ..user_interaction import ask_to_configure_network
-from ..user_interaction import ask_for_additional_users
-from ..user_interaction import select_language
-from ..user_interaction import select_mirror_regions
-from ..user_interaction import select_locale_lang
-from ..user_interaction import select_locale_enc
+from ..user_interaction import get_password, ask_for_a_timezone, save_config
+from ..user_interaction import select_additional_repositories
from ..user_interaction import select_disk_layout
-from ..user_interaction import select_kernel
from ..user_interaction import select_encrypted_partitions
from ..user_interaction import select_harddrives
+from ..user_interaction import select_kernel
+from ..user_interaction import select_language
+from ..user_interaction import select_locale_enc
+from ..user_interaction import select_locale_lang
+from ..user_interaction import select_mirror_regions
from ..user_interaction import select_profile
-from ..user_interaction import select_additional_repositories
-from ..models.users import User
from ..user_interaction.partitioning_conf import current_partition_layout
-from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@@ -42,6 +41,7 @@ if TYPE_CHECKING:
class GlobalMenu(GeneralMenu):
def __init__(self,data_store):
+ self._disk_check = True
super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3)
def _setup_selection_menu_options(self):
@@ -50,7 +50,8 @@ class GlobalMenu(GeneralMenu):
Selector(
_('Archinstall language'),
lambda x: self._select_archinstall_language(x),
- default='English')
+ display_func=lambda x: x.display_name,
+ default=self.translation_handler.get_language_by_abbr('en'))
self._menu_options['keyboard-layout'] = \
Selector(
_('Keyboard layout'),
@@ -143,6 +144,15 @@ class GlobalMenu(GeneralMenu):
display_func=lambda x: x if x else 'None',
default=None
)
+
+ self._menu_options['parallel downloads'] = \
+ Selector(
+ _('Parallel Downloads'),
+ add_number_of_parrallel_downloads,
+ display_func=lambda x: x if x else '0',
+ default=0
+ )
+
self._menu_options['kernels'] = \
Selector(
_('Kernels'),
@@ -163,7 +173,8 @@ class GlobalMenu(GeneralMenu):
Selector(
_('Network configuration'),
ask_to_configure_network,
- display_func=lambda x: self._prev_network_configuration(x),
+ display_func=lambda x: self._display_network_conf(x),
+ preview_func=self._prev_network_config,
default={})
self._menu_options['timezone'] = \
Selector(
@@ -204,14 +215,21 @@ class GlobalMenu(GeneralMenu):
# Then we need to identify which partitions to encrypt. This will default to / (root).
if len(list(encrypted_partitions(storage['arguments'].get('disk_layouts', [])))) == 0:
for blockdevice in storage['arguments']['disk_layouts']:
- for partition_index in select_encrypted_partitions(
- title="Select which partitions to encrypt:",
- partitions=storage['arguments']['disk_layouts'][blockdevice]['partitions']
- ):
-
- partition = storage['arguments']['disk_layouts'][blockdevice]['partitions'][partition_index]
- partition['encrypted'] = True
- partition['!password'] = storage['arguments']['!encryption-password']
+ if storage['arguments']['disk_layouts'][blockdevice].get('partitions'):
+ for partition_index in select_encrypted_partitions(
+ title=_('Select which partitions to encrypt:'),
+ partitions=storage['arguments']['disk_layouts'][blockdevice]['partitions'],
+ filter_=(lambda p: p['mountpoint'] != '/boot')
+ ):
+
+ partition = storage['arguments']['disk_layouts'][blockdevice]['partitions'][partition_index]
+ partition['encrypted'] = True
+ partition['!password'] = storage['arguments']['!encryption-password']
+
+ # We make sure generate-encryption-key-file is set on additional partitions
+ # other than the root partition. Otherwise they won't unlock properly #1279
+ if partition['mountpoint'] != '/':
+ partition['generate-encryption-key-file'] = True
def _install_text(self):
missing = len(self._missing_configs())
@@ -219,21 +237,28 @@ class GlobalMenu(GeneralMenu):
return _('Install ({} config(s) missing)').format(missing)
return _('Install')
- def _prev_network_configuration(self, cur_value: Union[NetworkConfiguration, List[NetworkConfiguration]]) -> str:
+ def _display_network_conf(self, cur_value: Union[NetworkConfiguration, List[NetworkConfiguration]]) -> str:
if not cur_value:
return _('Not configured, unavailable unless setup manually')
else:
if isinstance(cur_value, list):
- ifaces = [x.iface for x in cur_value]
- return f'Configured ifaces: {ifaces}'
+ return str(_('Configured {} interfaces')).format(len(cur_value))
else:
return str(cur_value)
+ def _prev_network_config(self) -> Optional[str]:
+ selector = self._menu_options['nic']
+ if selector.has_selection():
+ ifaces = selector.current_selection
+ if isinstance(ifaces, list):
+ return FormattedOutput.as_table(ifaces)
+ return None
+
def _prev_harddrives(self) -> Optional[str]:
selector = self._menu_options['harddrives']
if selector.has_selection():
drives = selector.current_selection
- return '\n\n'.join([d.display_info for d in drives])
+ return FormattedOutput.as_table(drives)
return None
def _prev_disk_layouts(self) -> Optional[str]:
@@ -288,11 +313,12 @@ class GlobalMenu(GeneralMenu):
missing += ['Hostname']
if not check('!root-password') and not has_superuser():
missing += [str(_('Either root-password or at least 1 user with sudo privileges must be specified'))]
- if not check('harddrives'):
- missing += ['Hard drives']
- if check('harddrives'):
- if not self._menu_options['harddrives'].is_empty() and not check('disk_layouts'):
- missing += ['Disk layout']
+ if self._disk_check:
+ if not check('harddrives'):
+ missing += [str(_('Drive(s)'))]
+ if check('harddrives'):
+ if not self._menu_options['harddrives'].is_empty() and not check('disk_layouts'):
+ missing += [str(_('Disk layout'))]
return missing
@@ -318,22 +344,26 @@ class GlobalMenu(GeneralMenu):
def _select_harddrives(self, old_harddrives : list) -> List:
harddrives = select_harddrives(old_harddrives)
- if len(harddrives) == 0:
- prompt = _(
- "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n"
- "WARNING: Archinstall won't check the suitability of this setup\n"
- "Do you wish to continue?"
- ).format(storage['MOUNT_POINT'])
+ if harddrives is not None:
+ if len(harddrives) == 0:
+ prompt = _(
+ "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n"
+ "WARNING: Archinstall won't check the suitability of this setup\n"
+ "Do you wish to continue?"
+ ).format(storage['MOUNT_POINT'])
- choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run()
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run()
- if choice.value == Menu.no():
- return self._select_harddrives(old_harddrives)
+ if choice.value == Menu.no():
+ self._disk_check = True
+ return self._select_harddrives(old_harddrives)
+ else:
+ self._disk_check = False
- # in case the harddrives got changed we have to reset the disk layout as well
- if old_harddrives != harddrives:
- self._menu_options['disk_layouts'].set_current_selection(None)
- storage['arguments']['disk_layouts'] = {}
+ # in case the harddrives got changed we have to reset the disk layout as well
+ if old_harddrives != harddrives:
+ self._menu_options['disk_layouts'].set_current_selection(None)
+ storage['arguments']['disk_layouts'] = {}
return harddrives
diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py
index cb567093..ae3a6eb5 100644
--- a/archinstall/lib/menu/list_manager.py
+++ b/archinstall/lib/menu/list_manager.py
@@ -1,94 +1,7 @@
-#!/usr/bin/python
-"""
-# Purpose
-ListManager is a widget based on `menu` which allows the handling of repetitive operations in a list.
-Imagine you have a list and want to add/copy/edit/delete their elements. With this widget you will be shown the list
-```
-Vamos alla
-
-Use ESC to skip
-
-
-> uno : 1
-dos : 2
-tres : 3
-cuatro : 4
-==>
-Confirm and exit
-Cancel
-(Press "/" to search)
-```
-Once you select one of the elements of the list, you will be promted with the action to be done to the selected element
-```
-
-uno : 1
-dos : 2
-> tres : 3
-cuatro : 4
-==>
-Confirm and exit
-Cancel
-(Press "/" to search)
-
-Select an action for < {'tres': 3} >
-
-
-Add
-Copy
-Edit
-Delete
-> Cancel
-```
-You execute the action for this element (which might or not involve user interaction) and return to the list main page
-till you call one of the options `confirm and exit` which returns the modified list or `cancel` which returns the original list unchanged.
-If the list is empty one action can be defined as default (usually Add). We call it **null_action**
-YOu can also define a **default_action** which will appear below the separator, not tied to any element of the list. Barring explicit definition, default_action will be the null_action
-```
-==>
-Add
-Confirm and exit
-Cancel
-(Press "/" to search)
-```
-The default implementation can handle simple lists and a key:value dictionary. The default actions are the shown above.
-A sample of basic usage is included at the end of the source.
-
-More sophisticaded uses can be achieved by
-* changing the action list and the null_action during intialization
-```
- opciones = ListManager('Vamos alla',opciones,[str(_('Add')),str(_('Delete'))],_('Add')).run()
-```
-* And using following methods to overwrite/define user actions and other details:
-* * `reformat`. To change the appearance of the list elements
-* * `action_list`. To modify the content of the action list once an element is defined. F.i. to avoid Delete to appear for certain elements, or to add/modify action based in the value of the element.
-* * `exec_action` which contains the actual code to be executed when an action is selected
-
-The contents in the base class of this methods serve for a very basic usage, and are to be taken as samples. Thus the best use of this class would be to subclass in your code
-
-```
- class ObjectList(archinstall.ListManager):
- def __init__(prompt,list):
- self.ObjectAction = [... list of actions ...]
- self.ObjectNullAction = one ObjectAction
- super().__init__(prompt,list,ObjectActions,ObjectNullAction)
- def reformat(self):
- ... beautfy the output of the list
- def action_list(self):
- ... if you need some changes to the action list based on self.target
- def exec_action(self):
- if self.action == self.ObjectAction[0]:
- performFirstAction(self.target, ...)
-
- ...
- resultList = ObjectList(prompt,originallist).run()
-```
-
-"""
import copy
from os import system
-from typing import Union, Any, TYPE_CHECKING, Dict, Optional
+from typing import Any, TYPE_CHECKING, Dict, Optional, Tuple, List
-from .text_input import TextInput
from .menu import Menu
if TYPE_CHECKING:
@@ -98,199 +11,132 @@ if TYPE_CHECKING:
class ListManager:
def __init__(
self,
- prompt :str,
- base_list :Union[list,dict] ,
- base_actions :list = None,
- null_action :str = None,
- default_action :Union[str,list] = None,
- header :Union[str,list] = None):
+ prompt: str,
+ entries: List[Any],
+ base_actions: List[str],
+ sub_menu_actions: List[str]
+ ):
"""
- param :prompt Text which will appear at the header
+ :param prompt: Text which will appear at the header
type param: string | DeferredTranslation
- param :base:_list list/dict of option to be shown / mainpulated
- type param: list | dict
-
- param base_actions an alternate list of actions to the items of the object
+ :param entries: list/dict of option to be shown / manipulated
type param: list
- param: null_action action which will be taken (if any) when base_list is empty
- type param: string
-
- param: default_action action which will be presented at the bottom of the list. Shouldn't need a target. If not present, null_action is set there.
- Both Null and Default actions can be defined outside the base_actions list, as long as they are launched in exec_action
- type param: string or list
+ :param base_actions: list of actions that is displayed in the main list manager,
+ usually global actions such as 'Add...'
+ type param: list
- param: header one or more header lines for the list
- type param: string or list
+ :param sub_menu_actions: list of actions available for a chosen entry
+ type param: list
"""
+ self._original_data = copy.deepcopy(entries)
+ self._data = copy.deepcopy(entries)
explainer = str(_('\n Choose an object from the list, and select one of the available actions for it to execute'))
self._prompt = prompt + explainer if prompt else explainer
- self._null_action = str(null_action) if null_action else None
+ self._separator = ''
+ self._confirm_action = str(_('Confirm and exit'))
+ self._cancel_action = str(_('Cancel'))
- if not default_action:
- self._default_action = [self._null_action]
- elif isinstance(default_action,(list,tuple)):
- self._default_action = default_action
- else:
- self._default_action = [str(default_action),]
+ self._terminate_actions = [self._confirm_action, self._cancel_action]
+ self._base_actions = base_actions
+ self._sub_menu_actions = sub_menu_actions
- self.header = header if header else None
- self.cancel_action = str(_('Cancel'))
- self.confirm_action = str(_('Confirm and exit'))
- self.separator = ''
- self.bottom_list = [self.confirm_action,self.cancel_action]
- self.bottom_item = [self.cancel_action]
- self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))]
- self._original_data = copy.deepcopy(base_list)
- self._data = copy.deepcopy(base_list) # as refs, changes are immediate
- # default values for the null case
- self.target: Optional[Any] = None
- self.action = self._null_action
+ self._last_choice = None
- if len(self._data) == 0 and self._null_action:
- self._data = self.exec_action(self._data)
+ @property
+ def last_choice(self):
+ return self._last_choice
def run(self):
while True:
# this will return a dictionary with the key as the menu entry to be displayed
# and the value is the original value from the self._data container
data_formatted = self.reformat(self._data)
- options = list(data_formatted.keys())
- options.append(self.separator)
-
- if self._default_action:
- options += self._default_action
-
- options += self.bottom_list
+ options, header = self._prepare_selection(data_formatted)
system('clear')
- target = Menu(
+ choice = Menu(
self._prompt,
options,
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
- header=self.header,
+ header=header,
skip_empty_entries=True,
- skip=False
+ skip=False,
+ show_search_hint=False
).run()
- if not target.value or target.value in self.bottom_list:
- self.action = target
+ if choice.value in self._base_actions:
+ self._data = self.handle_action(choice.value, None, self._data)
+ elif choice.value in self._terminate_actions:
break
+ else: # an entry of the existing selection was choosen
+ selected_entry = data_formatted[choice.value]
+ self._run_actions_on_entry(selected_entry)
- if target.value and target.value in self._default_action:
- self.action = target.value
- self.target = None
- self._data = self.exec_action(self._data)
- continue
-
- if isinstance(self._data,dict):
- data_key = data_formatted[target.value]
- key = self._data[data_key]
- self.target = {data_key: key}
- elif isinstance(self._data, list):
- self.target = [d for d in self._data if d == data_formatted[target.value]][0]
- else:
- self.target = self._data[data_formatted[target.value]]
-
- # Possible enhacement. If run_actions returns false a message line indicating the failure
- self.run_actions(target.value)
-
- if target.value == self.cancel_action: # TODO dubious
+ self._last_choice = choice
+ if choice.value == self._cancel_action:
return self._original_data # return the original list
else:
return self._data
- def run_actions(self,prompt_data=None):
- options = self.action_list() + self.bottom_item
- prompt = _("Select an action for < {} >").format(prompt_data if prompt_data else self.target)
+ def _prepare_selection(self, data_formatted: Dict[str, Any]) -> Tuple[List[str], str]:
+ # header rows are mapped to None so make sure
+ # to exclude those from the selectable data
+ options: List[str] = [key for key, val in data_formatted.items() if val is not None]
+ header = ''
+
+ if len(options) > 0:
+ table_header = [key for key, val in data_formatted.items() if val is None]
+ header = '\n'.join(table_header)
+
+ if len(options) > 0:
+ options.append(self._separator)
+
+ options += self._base_actions
+ options += self._terminate_actions
+
+ return options, header
+
+ def _run_actions_on_entry(self, entry: Any):
+ options = self.filter_options(entry,self._sub_menu_actions) + [self._cancel_action]
+ display_value = self.selected_action_display(entry)
+
+ prompt = _("Select an action for '{}'").format(display_value)
+
choice = Menu(
prompt,
options,
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
- preset_values=self.bottom_item,
show_search_hint=False
).run()
- self.action = choice.value
+ if choice.value and choice.value != self._cancel_action:
+ self._data = self.handle_action(choice.value, entry, self._data)
- if self.action and self.action != self.cancel_action:
- self._data = self.exec_action(self._data)
+ def selected_action_display(self, selection: Any) -> str:
+ # this will return the value to be displayed in the
+ # "Select an action for '{}'" string
+ raise NotImplementedError('Please implement me in the child class')
- """
- The following methods are expected to be overwritten by the user if the needs of the list are beyond the simple case
- """
+ def reformat(self, data: List[Any]) -> Dict[str, Any]:
+ # this should return a dictionary of display string to actual data entry
+ # mapping; if the value for a given display string is None it will be used
+ # in the header value (useful when displaying tables)
+ raise NotImplementedError('Please implement me in the child class')
- def reformat(self, data: Any) -> Dict[str, Any]:
- """
- method to get the data in a format suitable to be shown
- It is executed once for run loop and processes the whole self._data structure
- """
- if isinstance(data,dict):
- return {f'{k}: {v}': k for k, v in data.items()}
- else:
- return {str(k): k for k in data}
-
- def action_list(self):
- """
- can define alternate action list or customize the list for each item.
- Executed after any item is selected, contained in self.target
- """
- return self.base_actions
-
- def exec_action(self, data: Any):
- """
- what's executed one an item (self.target) and one action (self.action) is selected.
- Should be overwritten by the user
- The result is expected to update self._data in this routine, else it is ignored
- The basic code is useful for simple lists and dictionaries (key:value pairs, both strings)
- """
- # TODO guarantee unicity
- if isinstance(self._data,list):
- if self.action == str(_('Add')):
- self.target = TextInput(_('Add: '),None).run()
- self._data.append(self.target)
- if self.action == str(_('Copy')):
- while True:
- target = TextInput(_('Copy to: '),self.target).run()
- if target != self.target:
- self._data.append(self.target)
- break
- elif self.action == str(_('Edit')):
- tgt = self.target
- idx = self._data.index(self.target)
- result = TextInput(_('Edit: '),tgt).run()
- self._data[idx] = result
- elif self.action == str(_('Delete')):
- del self._data[self._data.index(self.target)]
- elif isinstance(self._data,dict):
- # allows overwrites
- if self.target:
- origkey,origval = list(self.target.items())[0]
- else:
- origkey = None
- origval = None
- if self.action == str(_('Add')):
- key = TextInput(_('Key: '),None).run()
- value = TextInput(_('Value: '),None).run()
- self._data[key] = value
- if self.action == str(_('Copy')):
- while True:
- key = TextInput(_('Copy to new key:'),origkey).run()
- if key != origkey:
- self._data[key] = origval
- break
- elif self.action == str(_('Edit')):
- value = TextInput(_('Edit {}: ').format(origkey), origval).run()
- self._data[origkey] = value
- elif self.action == str(_('Delete')):
- del self._data[origkey]
+ def handle_action(self, action: Any, entry: Optional[Any], data: List[Any]) -> List[Any]:
+ # this function is called when a base action or
+ # a specific action for an entry is triggered
+ raise NotImplementedError('Please implement me in the child class')
- return self._data
+ def filter_options(self, selection :Any, options :List[str]) -> List[str]:
+ # filter which actions to show for an specific selection
+ return options
diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py
index c34814eb..112bc0ae 100644
--- a/archinstall/lib/menu/menu.py
+++ b/archinstall/lib/menu/menu.py
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from enum import Enum, auto
-from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional
+from os import system
+from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional, Callable
from archinstall.lib.menu.simple_menu import TerminalMenu
@@ -51,13 +52,17 @@ class Menu(TerminalMenu):
sort :bool = True,
preset_values :Union[str, List[str]] = None,
cursor_index : Optional[int] = None,
- preview_command=None,
- preview_size=0.75,
- preview_title='Info',
+ preview_command: Optional[Callable] = None,
+ preview_size: float = 0.75,
+ preview_title: str = 'Info',
header :Union[List[str],str] = None,
- explode_on_interrupt :bool = False,
- explode_warning :str = '',
- **kwargs
+ raise_error_on_interrupt :bool = False,
+ raise_error_warning_msg :str = '',
+ clear_screen: bool = True,
+ show_search_hint: bool = True,
+ cycle_cursor: bool = True,
+ clear_menu_on_exit: bool = True,
+ skip_empty_entries: bool = False
):
"""
Creates a new menu
@@ -99,10 +104,10 @@ class Menu(TerminalMenu):
param header: one or more header lines for the menu
type param: string or list
- param explode_on_interrupt: This will explicitly handle a ctrl+c instead and return that specific state
+ param raise_error_on_interrupt: This will explicitly handle a ctrl+c instead and return that specific state
type param: bool
- param explode_warning: If explode_on_interrupt is True and this is non-empty, there will be a warning with a user confirmation displayed
+ param raise_error_warning_msg: If raise_error_on_interrupt is True and this is non-empty, there will be a warning with a user confirmation displayed
type param: str
:param kwargs : any SimpleTerminal parameter
@@ -115,7 +120,7 @@ class Menu(TerminalMenu):
# We check that the options are iterable. If not we abort. Else we copy them to lists
# it options is a dictionary we use the values as entries of the list
# if options is a string object, each character becomes an entry
- # if options is a list, we implictily build a copy to mantain immutability
+ # if options is a list, we implictily build a copy to maintain immutability
if not isinstance(p_options,Iterable):
log(f"Objects of type {type(p_options)} is not iterable, and are not supported at Menu",fg="red")
log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING)
@@ -145,27 +150,30 @@ class Menu(TerminalMenu):
self._skip = skip
self._default_option = default_option
self._multi = multi
- self._explode_on_interrupt = explode_on_interrupt
- self._explode_warning = explode_warning
+ self._raise_error_on_interrupt = raise_error_on_interrupt
+ self._raise_error_warning_msg = raise_error_warning_msg
+ self._preview_command = preview_command
menu_title = f'\n{title}\n\n'
if header:
if not isinstance(header,(list,tuple)):
header = [header]
- header = '\n'.join(header)
- menu_title += f'\n{header}\n'
+ menu_title += '\n'.join(header)
action_info = ''
if skip:
- action_info += str(_("Use ESC to skip"))
+ action_info += str(_('ESC to skip'))
- if self._explode_on_interrupt:
- if len(action_info) > 0:
- action_info += '\n'
- action_info += str(_('Use CTRL+C to reset current selection\n\n'))
+ if self._raise_error_on_interrupt:
+ action_info += ', ' if len(action_info) > 0 else ''
+ action_info += str(_('CTRL+C to reset'))
- menu_title += action_info
+ if multi:
+ action_info += ', ' if len(action_info) > 0 else ''
+ action_info += str(_('TAB to select'))
+
+ menu_title += action_info + '\n'
if default_option:
# if a default value was specified we move that one
@@ -178,10 +186,6 @@ class Menu(TerminalMenu):
cursor = "> "
main_menu_cursor_style = ("fg_cyan", "bold")
main_menu_style = ("bg_blue", "fg_gray")
- # defaults that can be changed up the stack
- kwargs['clear_screen'] = kwargs.get('clear_screen',True)
- kwargs['show_search_hint'] = kwargs.get('show_search_hint',True)
- kwargs['cycle_cursor'] = kwargs.get('cycle_cursor',True)
super().__init__(
menu_entries=self._menu_options,
@@ -195,12 +199,16 @@ class Menu(TerminalMenu):
# show_search_hint=True,
preselected_entries=self.preset_values,
cursor_index=self.cursor_index,
- preview_command=preview_command,
+ preview_command=lambda x: self._preview_wrapper(preview_command, x),
preview_size=preview_size,
preview_title=preview_title,
- explode_on_interrupt=self._explode_on_interrupt,
+ raise_error_on_interrupt=self._raise_error_on_interrupt,
multi_select_select_on_accept=False,
- **kwargs,
+ clear_screen=clear_screen,
+ show_search_hint=show_search_hint,
+ cycle_cursor=cycle_cursor,
+ clear_menu_on_exit=clear_menu_on_exit,
+ skip_empty_entries=skip_empty_entries
)
def _show(self) -> MenuSelection:
@@ -228,16 +236,24 @@ class Menu(TerminalMenu):
else:
return MenuSelection(type_=MenuSelectionType.Esc)
+ def _preview_wrapper(self, preview_command: Optional[Callable], current_selection: str) -> Optional[str]:
+ if preview_command:
+ if self._default_option is not None and f'{self._default_option} {self._default_str}' == current_selection:
+ current_selection = self._default_option
+ return preview_command(current_selection)
+ return None
+
def run(self) -> MenuSelection:
ret = self._show()
if ret.type_ == MenuSelectionType.Ctrl_c:
- if self._explode_on_interrupt and len(self._explode_warning) > 0:
- response = Menu(self._explode_warning, Menu.yes_no(), skip=False).run()
+ if self._raise_error_on_interrupt and len(self._raise_error_warning_msg) > 0:
+ response = Menu(self._raise_error_warning_msg, Menu.yes_no(), skip=False).run()
if response.value == Menu.no():
return self.run()
if ret.type_ is not MenuSelectionType.Selection and not self._skip:
+ system('clear')
return self.run()
return ret
diff --git a/archinstall/lib/menu/selection_menu.py b/archinstall/lib/menu/selection_menu.py
index 57e290f1..8a08812c 100644
--- a/archinstall/lib/menu/selection_menu.py
+++ b/archinstall/lib/menu/selection_menu.py
@@ -8,22 +8,15 @@ from typing import Callable, Any, List, Iterator, Tuple, Optional, Dict, TYPE_CH
from .menu import Menu, MenuSelectionType
from ..locale_helpers import set_keyboard_language
from ..output import log
-from ..translation import Translation
+from ..translationhandler import TranslationHandler, Language
from ..hsm.fido import get_fido2_devices
+from ..user_interaction.general_conf import select_archinstall_language
+
if TYPE_CHECKING:
_: Any
-def select_archinstall_language(preset_value: str) -> Optional[Any]:
- """
- copied from user_interaction/general_conf.py as a temporary measure
- """
- languages = Translation.get_available_lang()
- language = Menu(_('Archinstall language'), languages, preset_values=preset_value).run()
- return language.value
-
-
class Selector:
def __init__(
self,
@@ -190,13 +183,18 @@ class GeneralMenu:
"""
self._enabled_order :List[str] = []
- self._translation = Translation.load_nationalization()
+ self._translation_handler = TranslationHandler()
self.is_context_mgr = False
self._data_store = data_store if data_store is not None else {}
self.auto_cursor = auto_cursor
self._menu_options: Dict[str, Selector] = {}
self._setup_selection_menu_options()
self.preview_size = preview_size
+ self._last_choice = None
+
+ @property
+ def last_choice(self):
+ return self._last_choice
def __enter__(self, *args :Any, **kwargs :Any) -> GeneralMenu:
self.is_context_mgr = True
@@ -217,9 +215,13 @@ class GeneralMenu:
self.exit_callback()
+ @property
+ def translation_handler(self) -> TranslationHandler:
+ return self._translation_handler
+
def _setup_selection_menu_options(self):
""" Define the menu options.
- Menu options can be defined here in a subclass or done per progam calling self.set_option()
+ Menu options can be defined here in a subclass or done per program calling self.set_option()
"""
return
@@ -227,7 +229,7 @@ class GeneralMenu:
""" will be called before each action in the menu """
return
- def post_callback(self, selector_name :str, value :Any):
+ def post_callback(self, selection_name: str = None, value: Any = None):
""" will be called after each action in the menu """
return True
@@ -327,12 +329,16 @@ class GeneralMenu:
break
cursor_pos += 1
- value = value.strip()
+ value = value.strip()
- # if this calls returns false, we exit the menu
- # we allow for an callback for special processing on realeasing control
- if not self._process_selection(value):
- break
+ # if this calls returns false, we exit the menu
+ # we allow for an callback for special processing on realeasing control
+ if not self._process_selection(value):
+ break
+
+ # we get the last action key
+ actions = {str(v.description):k for k,v in self._menu_options.items()}
+ self._last_choice = actions[selection.value.strip()]
if not self.is_context_mgr:
self.__exit__()
@@ -347,7 +353,7 @@ class GeneralMenu:
return self.exec_option(config_name, selector)
def exec_option(self, config_name :str, p_selector :Selector = None) -> bool:
- """ processes the exection of a given menu entry
+ """ processes the execution of a given menu entry
- pre process callback
- selection function
- post process callback
@@ -461,13 +467,10 @@ class GeneralMenu:
mandatory_waiting += 1
return mandatory_fields, mandatory_waiting
- def _select_archinstall_language(self, preset_value: str) -> str:
- language = select_archinstall_language(preset_value)
- if language is not None:
- self._translation.activate(language)
- return language
-
- return preset_value
+ def _select_archinstall_language(self, preset_value: Language) -> Language:
+ language = select_archinstall_language(self.translation_handler.translated_languages, preset_value)
+ self._translation_handler.activate(language)
+ return language
def _select_hsm(self, preset :Optional[pathlib.Path] = None) -> Optional[pathlib.Path]:
title = _('Select which partitions to mark for formatting:')
diff --git a/archinstall/lib/menu/simple_menu.py b/archinstall/lib/menu/simple_menu.py
index 947259eb..1980e2ce 100644
--- a/archinstall/lib/menu/simple_menu.py
+++ b/archinstall/lib/menu/simple_menu.py
@@ -65,7 +65,7 @@ __author__ = "Ingo Meyer"
__email__ = "i.meyer@fz-juelich.de"
__copyright__ = "Copyright © 2021 Forschungszentrum Jülich GmbH. All rights reserved."
__license__ = "MIT"
-__version_info__ = (1, 4, 1)
+__version_info__ = (1, 5, 0)
__version__ = ".".join(map(str, __version_info__))
@@ -86,6 +86,7 @@ DEFAULT_MULTI_SELECT_SELECT_ON_ACCEPT = True
DEFAULT_PREVIEW_BORDER = True
DEFAULT_PREVIEW_SIZE = 0.25
DEFAULT_PREVIEW_TITLE = "preview"
+DEFAULT_QUIT_KEYS = ("escape", "q")
DEFAULT_SEARCH_CASE_SENSITIVE = False
DEFAULT_SEARCH_HIGHLIGHT_STYLE = ("fg_black", "bg_yellow", "bold")
DEFAULT_SEARCH_KEY = "/"
@@ -581,6 +582,8 @@ class TerminalMenu:
preview_command: Optional[Union[str, Callable[[str], str]]] = None,
preview_size: float = DEFAULT_PREVIEW_SIZE,
preview_title: str = DEFAULT_PREVIEW_TITLE,
+ quit_keys: Iterable[str] = DEFAULT_QUIT_KEYS,
+ raise_error_on_interrupt: bool = False,
search_case_sensitive: bool = DEFAULT_SEARCH_CASE_SENSITIVE,
search_highlight_style: Optional[Iterable[str]] = DEFAULT_SEARCH_HIGHLIGHT_STYLE,
search_key: Optional[str] = DEFAULT_SEARCH_KEY,
@@ -596,8 +599,7 @@ class TerminalMenu:
status_bar: Optional[Union[str, Iterable[str], Callable[[str], str]]] = None,
status_bar_below_preview: bool = DEFAULT_STATUS_BAR_BELOW_PREVIEW,
status_bar_style: Optional[Iterable[str]] = DEFAULT_STATUS_BAR_STYLE,
- title: Optional[Union[str, Iterable[str]]] = None,
- explode_on_interrupt: bool = False
+ title: Optional[Union[str, Iterable[str]]] = None
):
def extract_shortcuts_menu_entries_and_preview_arguments(
entries: Iterable[str],
@@ -716,10 +718,11 @@ class TerminalMenu:
self._preview_command = preview_command
self._preview_size = preview_size
self._preview_title = preview_title
+ self._quit_keys = tuple(quit_keys)
+ self._raise_error_on_interrupt = raise_error_on_interrupt
self._search_case_sensitive = search_case_sensitive
self._search_highlight_style = tuple(search_highlight_style) if search_highlight_style is not None else ()
self._search_key = search_key
- self._explode_on_interrupt = explode_on_interrupt
self._shortcut_brackets_highlight_style = (
tuple(shortcut_brackets_highlight_style) if shortcut_brackets_highlight_style is not None else ()
)
@@ -787,6 +790,7 @@ class TerminalMenu:
# backspace can be queried from the terminal database but is unreliable, query the terminal directly instead
self._init_backspace_control_character()
self._add_missing_control_characters_for_keys(self._accept_keys)
+ self._add_missing_control_characters_for_keys(self._quit_keys)
self._init_terminal_codes()
@staticmethod
@@ -1477,7 +1481,7 @@ class TerminalMenu:
"menu_down": set(("down", "ctrl-j", "j")),
"accept": set(self._accept_keys),
"multi_select": set(self._multi_select_keys),
- "quit": set(("escape", "q")),
+ "quit": set(self._quit_keys),
"search_start": set((self._search_key,)),
"backspace": set(("backspace",)),
} # type: Dict[str, Set[Optional[str]]]
@@ -1541,7 +1545,7 @@ class TerminalMenu:
# `search_start` key
self._search.search_text += next_key
except KeyboardInterrupt as e:
- if self._explode_on_interrupt:
+ if self._raise_error_on_interrupt:
raise e
menu_was_interrupted = True
finally:
@@ -1846,12 +1850,6 @@ def get_argumentparser() -> argparse.ArgumentParser:
)
parser.add_argument("-t", "--title", action="store", dest="title", help="menu title")
parser.add_argument(
- "--explode-on-interrupt",
- action="store_true",
- dest="explode_on_interrupt",
- help="Instead of quitting the menu, this will raise the KeyboardInterrupt Exception",
- )
- parser.add_argument(
"-V", "--version", action="store_true", dest="print_version", help="print the version number and exit"
)
parser.add_argument("entries", action="store", nargs="*", help="the menu entries to show")
@@ -1981,7 +1979,6 @@ def main() -> None:
status_bar_below_preview=args.status_bar_below_preview,
status_bar_style=args.status_bar_style,
title=args.title,
- explode_on_interrupt=args.explode_on_interrupt,
)
except (InvalidParameterCombinationError, InvalidStyleError, UnknownMenuEntryError) as e:
print(str(e), file=sys.stderr)
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index e4917f5e..f78a8b18 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -1,10 +1,12 @@
import logging
+import pathlib
import urllib.error
import urllib.request
from typing import Union, Mapping, Iterable, Dict, Any, List
from .general import SysCommand
from .output import log
+from .storage import storage
def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes:
"""
@@ -144,16 +146,22 @@ def re_rank_mirrors(
def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
- url = "https://archlinux32.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
regions = {}
- try:
- response = urllib.request.urlopen(url)
- except urllib.error.URLError as err:
- log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange")
- return regions
+ if storage['arguments']['offline']:
+ with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh:
+ mirrorlist = fh.read()
+ else:
+ url = "https://archlinux32.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
+
+ try:
+ response = urllib.request.urlopen(url)
+ except urllib.error.URLError as err:
+ log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange")
+ return regions
+
+ mirrorlist = response.read()
- mirrorlist = response.read()
if sort_order:
mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order)
@@ -170,5 +178,10 @@ def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
url = line.lstrip('#Server = ')
regions[region][url] = True
+ elif line.startswith('Server = '):
+ regions.setdefault(region, {})
+
+ url = line.lstrip('Server = ')
+ regions[region][url] = True
return regions
diff --git a/archinstall/lib/models/network_configuration.py b/archinstall/lib/models/network_configuration.py
index 4f135da5..e026e97b 100644
--- a/archinstall/lib/models/network_configuration.py
+++ b/archinstall/lib/models/network_configuration.py
@@ -39,8 +39,22 @@ class NetworkConfiguration:
else:
return 'Unknown type'
- # for json serialization when calling json.dumps(...) on this class
- def json(self):
+ def as_json(self) -> Dict:
+ exclude_fields = ['type']
+ data = {}
+ for k, v in self.__dict__.items():
+ if k not in exclude_fields:
+ if isinstance(v, list) and len(v) == 0:
+ v = ''
+ elif v is None:
+ v = ''
+
+ data[k] = v
+
+ return data
+
+ def json(self) -> Dict:
+ # for json serialization when calling json.dumps(...) on this class
return self.__dict__
def is_iso(self) -> bool:
@@ -111,19 +125,10 @@ class NetworkConfigurationHandler:
else: # not recognized
return None
- def _parse_manual_config(self, config: Dict[str, Any]) -> Union[None, List[NetworkConfiguration]]:
- manual_configs: List = config.get('config', [])
-
- if not manual_configs:
- return None
-
- if not isinstance(manual_configs, list):
- log(_('Manual configuration setting must be a list'))
- exit(1)
-
+ def _parse_manual_config(self, configs: List[Dict[str, Any]]) -> Optional[List[NetworkConfiguration]]:
configurations = []
- for manual_config in manual_configs:
+ for manual_config in configs:
iface = manual_config.get('iface', None)
if iface is None:
@@ -135,7 +140,7 @@ class NetworkConfigurationHandler:
NetworkConfiguration(NicType.MANUAL, iface=iface)
)
else:
- ip = config.get('ip', '')
+ ip = manual_config.get('ip', '')
if not ip:
log(_('Manual nic configuration with no auto DHCP requires an IP address'), fg='red')
exit(1)
@@ -145,32 +150,34 @@ class NetworkConfigurationHandler:
NicType.MANUAL,
iface=iface,
ip=ip,
- gateway=config.get('gateway', ''),
- dns=config.get('dns', []),
+ gateway=manual_config.get('gateway', ''),
+ dns=manual_config.get('dns', []),
dhcp=False
)
)
return configurations
- def parse_arguments(self, config: Any):
- nic_type = config.get('type', None)
-
- if not nic_type:
- # old style definitions
- network_config = self._backwards_compability_config(config)
- if network_config:
- return network_config
- return None
-
+ def _parse_nic_type(self, nic_type: str) -> NicType:
try:
- type_ = NicType(nic_type)
+ return NicType(nic_type)
except ValueError:
options = [e.value for e in NicType]
log(_('Unknown nic type: {}. Possible values are {}').format(nic_type, options), fg='red')
exit(1)
- if type_ != NicType.MANUAL:
- self._configuration = NetworkConfiguration(type_)
- else: # manual configuration settings
+ def parse_arguments(self, config: Any):
+ if isinstance(config, list): # new data format
self._configuration = self._parse_manual_config(config)
+ elif nic_type := config.get('type', None): # new data format
+ type_ = self._parse_nic_type(nic_type)
+
+ if type_ != NicType.MANUAL:
+ self._configuration = NetworkConfiguration(type_)
+ else: # manual configuration settings
+ self._configuration = self._parse_manual_config([config])
+ else: # old style definitions
+ network_config = self._backwards_compability_config(config)
+ if network_config:
+ return network_config
+ return None
diff --git a/archinstall/lib/models/password_strength.py b/archinstall/lib/models/password_strength.py
new file mode 100644
index 00000000..61986bf0
--- /dev/null
+++ b/archinstall/lib/models/password_strength.py
@@ -0,0 +1,85 @@
+from enum import Enum
+
+
+class PasswordStrength(Enum):
+ VERY_WEAK = 'very weak'
+ WEAK = 'weak'
+ MODERATE = 'moderate'
+ STRONG = 'strong'
+
+ @property
+ def value(self):
+ match self:
+ case PasswordStrength.VERY_WEAK: return str(_('very weak'))
+ case PasswordStrength.WEAK: return str(_('weak'))
+ case PasswordStrength.MODERATE: return str(_('moderate'))
+ case PasswordStrength.STRONG: return str(_('strong'))
+
+ def color(self):
+ match self:
+ case PasswordStrength.VERY_WEAK: return 'red'
+ case PasswordStrength.WEAK: return 'red'
+ case PasswordStrength.MODERATE: return 'yellow'
+ case PasswordStrength.STRONG: return 'green'
+
+ @classmethod
+ def strength(cls, password: str) -> 'PasswordStrength':
+ digit = any(character.isdigit() for character in password)
+ upper = any(character.isupper() for character in password)
+ lower = any(character.islower() for character in password)
+ symbol = any(not character.isalnum() for character in password)
+ return cls._check_password_strength(digit, upper, lower, symbol, len(password))
+
+ @classmethod
+ def _check_password_strength(
+ cls,
+ digit: bool,
+ upper: bool,
+ lower: bool,
+ symbol: bool,
+ length: int
+ ) -> 'PasswordStrength':
+ # suggested evaluation
+ # https://github.com/archlinux/archinstall/issues/1304#issuecomment-1146768163
+ if digit and upper and lower and symbol:
+ match length:
+ case num if 13 <= num:
+ return PasswordStrength.STRONG
+ case num if 11 <= num <= 12:
+ return PasswordStrength.MODERATE
+ case num if 7 <= num <= 10:
+ return PasswordStrength.WEAK
+ case num if num <= 6:
+ return PasswordStrength.VERY_WEAK
+ elif digit and upper and lower:
+ match length:
+ case num if 14 <= num:
+ return PasswordStrength.STRONG
+ case num if 11 <= num <= 13:
+ return PasswordStrength.MODERATE
+ case num if 7 <= num <= 10:
+ return PasswordStrength.WEAK
+ case num if num <= 6:
+ return PasswordStrength.VERY_WEAK
+ elif upper and lower:
+ match length:
+ case num if 15 <= num:
+ return PasswordStrength.STRONG
+ case num if 12 <= num <= 14:
+ return PasswordStrength.MODERATE
+ case num if 7 <= num <= 11:
+ return PasswordStrength.WEAK
+ case num if num <= 6:
+ return PasswordStrength.VERY_WEAK
+ elif lower or upper:
+ match length:
+ case num if 18 <= num:
+ return PasswordStrength.STRONG
+ case num if 14 <= num <= 17:
+ return PasswordStrength.MODERATE
+ case num if 9 <= num <= 13:
+ return PasswordStrength.WEAK
+ case num if num <= 8:
+ return PasswordStrength.VERY_WEAK
+
+ return PasswordStrength.VERY_WEAK
diff --git a/archinstall/lib/models/subvolume.py b/archinstall/lib/models/subvolume.py
new file mode 100644
index 00000000..34a09227
--- /dev/null
+++ b/archinstall/lib/models/subvolume.py
@@ -0,0 +1,68 @@
+from dataclasses import dataclass
+from typing import List, Any, Dict
+
+
+@dataclass
+class Subvolume:
+ name: str
+ mountpoint: str
+ compress: bool = False
+ nodatacow: bool = False
+
+ def display(self) -> str:
+ options_str = ','.join(self.options)
+ return f'{_("Subvolume")}: {self.name:15} {_("Mountpoint")}: {self.mountpoint:20} {_("Options")}: {options_str}'
+
+ @property
+ def options(self) -> List[str]:
+ options = [
+ 'compress' if self.compress else '',
+ 'nodatacow' if self.nodatacow else ''
+ ]
+ return [o for o in options if len(o)]
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'mountpoint': self.mountpoint,
+ 'compress': self.compress,
+ 'nodatacow': self.nodatacow
+ }
+
+ @classmethod
+ def _parse(cls, config_subvolumes: List[Dict[str, Any]]) -> List['Subvolume']:
+ subvolumes = []
+ for entry in config_subvolumes:
+ if not entry.get('name', None) or not entry.get('mountpoint', None):
+ continue
+
+ subvolumes.append(
+ Subvolume(
+ entry['name'],
+ entry['mountpoint'],
+ entry.get('compress', False),
+ entry.get('nodatacow', False)
+ )
+ )
+
+ return subvolumes
+
+ @classmethod
+ def _parse_backwards_compatible(cls, config_subvolumes) -> List['Subvolume']:
+ subvolumes = []
+ for name, mountpoint in config_subvolumes.items():
+ if not name or not mountpoint:
+ continue
+
+ subvolumes.append(Subvolume(name, mountpoint))
+
+ return subvolumes
+
+ @classmethod
+ def parse_arguments(cls, config_subvolumes: Any) -> List['Subvolume']:
+ if isinstance(config_subvolumes, list):
+ return cls._parse(config_subvolumes)
+ elif isinstance(config_subvolumes, dict):
+ return cls._parse_backwards_compatible(config_subvolumes)
+
+ raise ValueError('Unknown disk layout btrfs subvolume format')
diff --git a/archinstall/lib/models/users.py b/archinstall/lib/models/users.py
index 6052b73a..a8feb9ef 100644
--- a/archinstall/lib/models/users.py
+++ b/archinstall/lib/models/users.py
@@ -1,6 +1,8 @@
from dataclasses import dataclass
from typing import Dict, List, Union, Any, TYPE_CHECKING
+from .password_strength import PasswordStrength
+
if TYPE_CHECKING:
_: Any
@@ -25,8 +27,11 @@ class User:
}
def display(self) -> str:
- password = '*' * len(self.password)
- return f'{_("Username")}: {self.username:16} {_("Password")}: {password:16} sudo: {str(self.sudo)}'
+ password = '*' * (len(self.password) if self.password else 0)
+ if password:
+ strength = PasswordStrength.strength(self.password)
+ password += f' ({strength.value})'
+ return f'{_("Username")}: {self.username:16} {_("Password")}: {password:20} sudo: {str(self.sudo)}'
@classmethod
def _parse(cls, config_users: List[Dict[str, Any]]) -> List['User']:
@@ -64,13 +69,13 @@ class User:
) -> List['User']:
users = []
- # backwards compability
+ # backwards compatibility
if isinstance(config_users, dict):
users += cls._parse_backwards_compatible(config_users, False)
else:
users += cls._parse(config_users)
- # backwards compability
+ # backwards compatibility
if isinstance(config_superusers, dict):
users += cls._parse_backwards_compatible(config_superusers, True)
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index 3e883a7b..96e8f3a1 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -4,7 +4,7 @@ import socket
import struct
from typing import Union, Dict, Any, List
-from .exceptions import HardwareIncompatibilityError
+from .exceptions import HardwareIncompatibilityError, SysCallError
from .general import SysCommand
from .output import log
from .pacman import run_pacman
@@ -33,14 +33,17 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
def check_mirror_reachable() -> bool:
log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO)
- if run_pacman("-Sy").exit_code == 0:
- return True
-
- elif os.geteuid() != 0:
- log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
+ try:
+ if run_pacman("-Sy").exit_code == 0:
+ return True
+ elif os.geteuid() != 0:
+ log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
+ except SysCallError as err:
+ log(err, level=logging.DEBUG)
return False
+
def update_keyring() -> bool:
log("Updating archlinux-keyring ...", level=logging.INFO)
if run_pacman("-Sy --noconfirm archlinux-keyring").exit_code == 0:
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index 29b73bc4..709a7382 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -2,43 +2,83 @@ import logging
import os
import sys
from pathlib import Path
-from typing import Dict, Union, List, Any
+from typing import Dict, Union, List, Any, Callable
from .storage import storage
+from dataclasses import asdict, is_dataclass
class FormattedOutput:
@classmethod
- def values(cls, o: Any) -> Dict[str, Any]:
- if hasattr(o, 'json'):
+ def values(cls, o: Any, class_formatter: str = None, filter_list: List[str] = None) -> Dict[str, Any]:
+ """ the original values returned a dataclass as dict thru the call to some specific methods
+ this version allows thru the parameter class_formatter to call a dynamicly selected formatting method.
+ Can transmit a filter list to the class_formatter,
+ """
+ if class_formatter:
+ # if invoked per reference it has to be a standard function or a classmethod.
+ # A method of an instance does not make sense
+ if callable(class_formatter):
+ return class_formatter(o, filter_list)
+ # if is invoked by name we restrict it to a method of the class. No need to mess more
+ elif hasattr(o, class_formatter) and callable(getattr(o, class_formatter)):
+ func = getattr(o, class_formatter)
+ return func(filter_list)
+ # kept as to make it backward compatible
+ elif hasattr(o, 'as_json'):
+ return o.as_json()
+ elif hasattr(o, 'json'):
return o.json()
+ elif is_dataclass(o):
+ return asdict(o)
else:
return o.__dict__
@classmethod
- def as_table(cls, obj: List[Any]) -> str:
+ def as_table(cls, obj: List[Any], class_formatter: Union[str, Callable] = None, filter_list: List[str] = None) -> str:
+ """ variant of as_table (subtly different code) which has two additional parameters
+ filter which is a list of fields which will be shon
+ class_formatter a special method to format the outgoing data
+
+ A general comment, the format selected for the output (a string where every data record is separated by newline)
+ is for compatibility with a print statement
+ As_table_filter can be a drop in replacement for as_table
+ """
+ raw_data = [cls.values(o, class_formatter, filter_list) for o in obj]
+ # determine the maximum column size
column_width: Dict[str, int] = {}
- for o in obj:
- for k, v in cls.values(o).items():
- column_width.setdefault(k, 0)
- column_width[k] = max([column_width[k], len(str(v)), len(k)])
-
+ for o in raw_data:
+ for k, v in o.items():
+ if not filter_list or k in filter_list:
+ column_width.setdefault(k, 0)
+ column_width[k] = max([column_width[k], len(str(v)), len(k)])
+
+ if not filter_list:
+ filter_list = (column_width.keys())
+ # create the header lines
output = ''
- for key, width in column_width.items():
+ key_list = []
+ for key in filter_list:
+ width = column_width[key]
key = key.replace('!', '')
- output += key.ljust(width) + ' | '
-
- output = output[:-3] + '\n'
+ key_list.append(key.ljust(width))
+ output += ' | '.join(key_list) + '\n'
output += '-' * len(output) + '\n'
- for o in obj:
- for k, v in cls.values(o).items():
- if '!' in k:
- v = '*' * len(str(v))
- output += str(v).ljust(column_width[k]) + ' | '
- output = output[:-3]
- output += '\n'
+ # create the data lines
+ for record in raw_data:
+ obj_data = []
+ for key in filter_list:
+ width = column_width.get(key, len(key))
+ value = record.get(key, '')
+ if '!' in key:
+ value = '*' * width
+ if isinstance(value,(int, float)) or (isinstance(value, str) and value.isnumeric()):
+ obj_data.append(str(value).rjust(width))
+ else:
+ obj_data.append(str(value).ljust(width))
+ output += ' | '.join(obj_data) + '\n'
return output
diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py
index 99e3811c..0ff63610 100644
--- a/archinstall/lib/plugins.py
+++ b/archinstall/lib/plugins.py
@@ -60,7 +60,7 @@ def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType:
log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR)
try:
- del(sys.modules[namespace])
+ del(sys.modules[namespace]) # noqa: E275
except:
pass
@@ -73,6 +73,7 @@ def find_nth(haystack :List[str], needle :str, n :int) -> int:
def load_plugin(path :str) -> ModuleType:
parsed_url = urllib.parse.urlparse(path)
+ log(f"Loading plugin {parsed_url}.", fg="gray", level=logging.INFO)
# The Profile was not a direct match on a remote URL
if not parsed_url.scheme:
@@ -96,6 +97,7 @@ def load_plugin(path :str) -> ModuleType:
if hasattr(sys.modules[namespace], 'Plugin'):
try:
plugins[namespace] = sys.modules[namespace].Plugin()
+ log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO)
except Exception as err:
log(err, level=logging.ERROR)
log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR)
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index dd7ddc88..8c358161 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -17,13 +17,13 @@ storage: Dict[str, Any] = {
# os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
],
'UPSTREAM_URL': 'https://raw.githubusercontent.com/archlinux/archinstall/master/profiles',
- 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
+ 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabbing.
'LOG_PATH': '/var/log/archinstall',
'LOG_FILE': 'install.log',
'MOUNT_POINT': '/mnt/archinstall',
'ENC_IDENTIFIER': 'ainst',
'DISK_TIMEOUTS' : 1, # seconds
'DISK_RETRY_ATTEMPTS' : 5, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations
- 'CMD_LOCALE':{'LC_ALL':'C'}, # default locale for execution commands. Can be overriden with set_cmd_locale()
+ 'CMD_LOCALE':{'LC_ALL':'C'}, # default locale for execution commands. Can be overridden with set_cmd_locale()
'CMD_LOCALE_DEFAULT':{'LC_ALL':'C'}, # should be the same as the former. Not be used except in reset_cmd_locale()
}
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py
index 3d2f0385..f459f94b 100644
--- a/archinstall/lib/systemd.py
+++ b/archinstall/lib/systemd.py
@@ -88,7 +88,7 @@ class Boot:
if len(args) >= 2 and args[1]:
log(args[1], level=logging.ERROR, fg='red')
- log(f"The error above occured in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red")
+ log(f"The error above occurred in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red")
shutdown = None
shutdown_exit_code = -1
diff --git a/archinstall/lib/translation.py b/archinstall/lib/translation.py
deleted file mode 100644
index 1a0e94e4..00000000
--- a/archinstall/lib/translation.py
+++ /dev/null
@@ -1,112 +0,0 @@
-from __future__ import annotations
-
-import json
-import os
-import gettext
-
-from pathlib import Path
-from typing import List, Dict, Any, TYPE_CHECKING, Tuple
-from .exceptions import TranslationError
-
-if TYPE_CHECKING:
- _: Any
-
-
-class LanguageDefinitions:
- def __init__(self):
- self._mappings = self._get_language_mappings()
-
- def _get_language_mappings(self) -> List[Dict[str, str]]:
- locales_dir = Translation.get_locales_dir()
- languages = Path.joinpath(locales_dir, 'languages.json')
-
- with open(languages, 'r') as fp:
- return json.load(fp)
-
- def get_language(self, abbr: str) -> str:
- for entry in self._mappings:
- if entry['abbr'] == abbr:
- return entry['lang']
-
- raise ValueError(f'No language with abbrevation "{abbr}" found')
-
-
-class DeferredTranslation:
- def __init__(self, message: str):
- self.message = message
-
- def __len__(self) -> int:
- return len(self.message)
-
- def __str__(self) -> str:
- translate = _
- if translate is DeferredTranslation:
- return self.message
- return translate(self.message)
-
- def __lt__(self, other) -> bool:
- return self.message < other
-
- def __gt__(self, other) -> bool:
- return self.message > other
-
- def __add__(self, other) -> DeferredTranslation:
- if isinstance(other, str):
- other = DeferredTranslation(other)
-
- concat = self.message + other.message
- return DeferredTranslation(concat)
-
- def format(self, *args) -> str:
- return self.message.format(*args)
-
- @classmethod
- def install(cls):
- import builtins
- builtins._ = cls
-
-
-class Translation:
- def __init__(self, locales_dir):
- self._languages = {}
-
- for names in self._get_translation_lang():
- try:
- self._languages[names[0]] = gettext.translation('base', localedir=locales_dir, languages=names)
- except FileNotFoundError as error:
- raise TranslationError(f"Could not locate language file for '{names}': {error}")
-
- def activate(self, name):
- if language := self._languages.get(name, None):
- language.install()
- else:
- raise ValueError(f'Language not supported: {name}')
-
- @classmethod
- def load_nationalization(cls) -> Translation:
- locales_dir = cls.get_locales_dir()
- return Translation(locales_dir)
-
- @classmethod
- def get_locales_dir(cls) -> Path:
- cur_path = Path(__file__).parent.parent
- locales_dir = Path.joinpath(cur_path, 'locales')
- return locales_dir
-
- @classmethod
- def _defined_languages(cls) -> List[str]:
- locales_dir = cls.get_locales_dir()
- filenames = os.listdir(locales_dir)
- return list(filter(lambda x: len(x) == 2, filenames))
-
- @classmethod
- def _get_translation_lang(cls) -> List[Tuple[str, str]]:
- def_languages = cls._defined_languages()
- languages = LanguageDefinitions()
- return [(languages.get_language(lang), lang) for lang in def_languages]
-
- @classmethod
- def get_available_lang(cls) -> List[str]:
- def_languages = cls._defined_languages()
- languages = LanguageDefinitions()
- return [languages.get_language(lang) for lang in def_languages]
diff --git a/archinstall/lib/translationhandler.py b/archinstall/lib/translationhandler.py
new file mode 100644
index 00000000..ef33b8ec
--- /dev/null
+++ b/archinstall/lib/translationhandler.py
@@ -0,0 +1,230 @@
+from __future__ import annotations
+
+import json
+import logging
+import os
+import gettext
+from dataclasses import dataclass
+
+from pathlib import Path
+from typing import List, Dict, Any, TYPE_CHECKING, Optional
+from .exceptions import TranslationError
+
+if TYPE_CHECKING:
+ _: Any
+
+
+@dataclass
+class Language:
+ abbr: str
+ name_en: str
+ translation: gettext.NullTranslations
+ translation_percent: int
+ translated_lang: Optional[str]
+ external_dep: Optional[str]
+
+ @property
+ def display_name(self) -> str:
+ if not self.external_dep and self.translated_lang:
+ name = self.translated_lang
+ else:
+ name = self.name_en
+
+ return f'{name} ({self.translation_percent}%)'
+
+ def is_match(self, lang_or_translated_lang: str) -> bool:
+ if self.name_en == lang_or_translated_lang:
+ return True
+ elif self.translated_lang == lang_or_translated_lang:
+ return True
+ return False
+
+ def json(self) -> str:
+ return self.name_en
+
+
+class TranslationHandler:
+ def __init__(self):
+ self._base_pot = 'base.pot'
+ self._languages = 'languages.json'
+
+ # check if a custom font was provided, otherwise we'll
+ # use one that can display latin, greek, cyrillic characters
+ if self.is_custom_font_enabled():
+ self._set_font(self.custom_font_path().name)
+ else:
+ self._set_font('LatGrkCyr-8x16')
+
+ self._total_messages = self._get_total_active_messages()
+ self._translated_languages = self._get_translations()
+
+ @classmethod
+ def custom_font_path(cls) -> Path:
+ return Path('/usr/share/kbd/consolefonts/archinstall_font.psfu.gz')
+
+ @classmethod
+ def is_custom_font_enabled(cls) -> bool:
+ return cls.custom_font_path().exists()
+
+ @property
+ def translated_languages(self) -> List[Language]:
+ return self._translated_languages
+
+ def _get_translations(self) -> List[Language]:
+ """
+ Load all translated languages and return a list of such
+ """
+ mappings = self._load_language_mappings()
+ defined_languages = self._provided_translations()
+
+ languages = []
+
+ for short_form in defined_languages:
+ mapping_entry: Dict[str, Any] = next(filter(lambda x: x['abbr'] == short_form, mappings))
+ abbr = mapping_entry['abbr']
+ lang = mapping_entry['lang']
+ translated_lang = mapping_entry.get('translated_lang', None)
+ external_dep = mapping_entry.get('external_dep', False)
+
+ try:
+ # get a translation for a specific language
+ translation = gettext.translation('base', localedir=self._get_locales_dir(), languages=(abbr, lang))
+
+ # calculate the percentage of total translated text to total number of messages
+ if abbr == 'en':
+ percent = 100
+ else:
+ num_translations = self._get_catalog_size(translation)
+ percent = int((num_translations / self._total_messages) * 100)
+ # prevent cases where the .pot file is out of date and the percentage is above 100
+ percent = min(100, percent)
+
+ language = Language(abbr, lang, translation, percent, translated_lang, external_dep)
+ languages.append(language)
+ except FileNotFoundError as error:
+ raise TranslationError(f"Could not locate language file for '{lang}': {error}")
+
+ return languages
+
+ def _set_font(self, font: str):
+ """
+ Set the provided font as the new terminal font
+ """
+ from archinstall import SysCommand, log
+ try:
+ log(f'Setting font: {font}', level=logging.DEBUG)
+ SysCommand(f'setfont {font}')
+ except Exception:
+ log(f'Unable to set font {font}', level=logging.ERROR)
+
+ def _load_language_mappings(self) -> List[Dict[str, Any]]:
+ """
+ Load the mapping table of all known languages
+ """
+ locales_dir = self._get_locales_dir()
+ languages = Path.joinpath(locales_dir, self._languages)
+
+ with open(languages, 'r') as fp:
+ return json.load(fp)
+
+ def _get_catalog_size(self, translation: gettext.NullTranslations) -> int:
+ """
+ Get the number of translated messages for a translations
+ """
+ # this is a very naughty way of retrieving the data but
+ # there's no alternative method exposed unfortunately
+ catalog = translation._catalog # type: ignore
+ messages = {k: v for k, v in catalog.items() if k and v}
+ return len(messages)
+
+ def _get_total_active_messages(self) -> int:
+ """
+ Get total messages that could be translated
+ """
+ locales = self._get_locales_dir()
+ with open(f'{locales}/{self._base_pot}', 'r') as fp:
+ lines = fp.readlines()
+ msgid_lines = [line for line in lines if 'msgid' in line]
+
+ return len(msgid_lines) - 1 # don't count the first line which contains the metadata
+
+ def get_language_by_name(self, name: str) -> Language:
+ """
+ Get a language object by it's name, e.g. English
+ """
+ try:
+ return next(filter(lambda x: x.name_en == name, self._translated_languages))
+ except Exception:
+ raise ValueError(f'No language with name found: {name}')
+
+ def get_language_by_abbr(self, abbr: str) -> Language:
+ """
+ Get a language object by its abbrevation, e.g. en
+ """
+ try:
+ return next(filter(lambda x: x.abbr == abbr, self._translated_languages))
+ except Exception:
+ raise ValueError(f'No language with abbreviation "{abbr}" found')
+
+ def activate(self, language: Language):
+ """
+ Set the provided language as the current translation
+ """
+ language.translation.install()
+
+ def _get_locales_dir(self) -> Path:
+ """
+ Get the locales directory path
+ """
+ cur_path = Path(__file__).parent.parent
+ locales_dir = Path.joinpath(cur_path, 'locales')
+ return locales_dir
+
+ def _provided_translations(self) -> List[str]:
+ """
+ Get a list of all known languages
+ """
+ locales_dir = self._get_locales_dir()
+ filenames = os.listdir(locales_dir)
+
+ translation_files = []
+ for filename in filenames:
+ if len(filename) == 2 or filename == 'pt_BR':
+ translation_files.append(filename)
+
+ return translation_files
+
+
+class DeferredTranslation:
+ def __init__(self, message: str):
+ self.message = message
+
+ def __len__(self) -> int:
+ return len(self.message)
+
+ def __str__(self) -> str:
+ translate = _
+ if translate is DeferredTranslation:
+ return self.message
+ return translate(self.message)
+
+ def __lt__(self, other) -> bool:
+ return self.message < other
+
+ def __gt__(self, other) -> bool:
+ return self.message > other
+
+ def __add__(self, other) -> DeferredTranslation:
+ if isinstance(other, str):
+ other = DeferredTranslation(other)
+
+ concat = self.message + other.message
+ return DeferredTranslation(concat)
+
+ def format(self, *args) -> str:
+ return self.message.format(*args)
+
+ @classmethod
+ def install(cls):
+ import builtins
+ builtins._ = cls
diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py
index 8aba4b4d..a1ca2652 100644
--- a/archinstall/lib/user_interaction/__init__.py
+++ b/archinstall/lib/user_interaction/__init__.py
@@ -7,6 +7,6 @@ from .network_conf import ask_to_configure_network
from .partitioning_conf import select_partition, select_encrypted_partitions
from .general_conf import (ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions,
select_profile, select_archinstall_language, ask_additional_packages_to_install,
- select_additional_repositories, ask_hostname)
+ select_additional_repositories, ask_hostname, add_number_of_parrallel_downloads)
from .disk_conf import ask_for_main_filesystem_format, select_individual_blockdevice_usage, select_disk_layout, select_disk
from .utils import get_password, do_countdown
diff --git a/archinstall/lib/user_interaction/backwards_compatible_conf.py b/archinstall/lib/user_interaction/backwards_compatible_conf.py
index d91690eb..296572d2 100644
--- a/archinstall/lib/user_interaction/backwards_compatible_conf.py
+++ b/archinstall/lib/user_interaction/backwards_compatible_conf.py
@@ -40,7 +40,7 @@ def generic_select(
# We check that the options are iterable. If not we abort. Else we copy them to lists
# it options is a dictionary we use the values as entries of the list
# if options is a string object, each character becomes an entry
- # if options is a list, we implictily build a copy to mantain immutability
+ # if options is a list, we implictily build a copy to maintain immutability
if not isinstance(p_options, Iterable):
log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select", fg="red")
log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>", level=logging.WARNING)
diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/user_interaction/disk_conf.py
index 371d052f..b5ed6967 100644
--- a/archinstall/lib/user_interaction/disk_conf.py
+++ b/archinstall/lib/user_interaction/disk_conf.py
@@ -45,8 +45,8 @@ def select_disk_layout(preset: Optional[Dict[str, Any]], block_devices: list, ad
choice = Menu(
_('Select what you wish to do with the selected block devices'),
modes,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match choice.type_:
diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py
index d4dc60db..6365014d 100644
--- a/archinstall/lib/user_interaction/general_conf.py
+++ b/archinstall/lib/user_interaction/general_conf.py
@@ -1,10 +1,9 @@
from __future__ import annotations
import logging
+import pathlib
from typing import List, Any, Optional, Dict, TYPE_CHECKING
-import archinstall
-
from ..menu.menu import MenuSelectionType
from ..menu.text_input import TextInput
@@ -14,9 +13,11 @@ from ..output import log
from ..profiles import Profile, list_profiles
from ..mirrors import list_mirrors
-from ..translation import Translation
+from ..translationhandler import Language, TranslationHandler
from ..packages.packages import validate_package_list
+from ..storage import storage
+
if TYPE_CHECKING:
_: Any
@@ -109,7 +110,7 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
list(mirrors.keys()),
preset_values=preselected,
multi=True,
- explode_on_interrupt=True
+ raise_error_on_interrupt=True
).run()
match selected_mirror.type_:
@@ -118,10 +119,40 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
case _: return {selected: mirrors[selected] for selected in selected_mirror.value}
-def select_archinstall_language(default='English'):
- languages = Translation.get_available_lang()
- language = Menu(_('Archinstall language'), languages, default_option=default).run()
- return language
+def select_archinstall_language(languages: List[Language], preset_value: Language) -> Language:
+ # these are the displayed language names which can either be
+ # the english name of a language or, if present, the
+ # name of the language in its own language
+ options = {lang.display_name: lang for lang in languages}
+
+ def dependency_preview(current_selection: str) -> Optional[str]:
+ current_lang = options[current_selection]
+
+ if current_lang.external_dep and not TranslationHandler.is_custom_font_enabled():
+ font_file = TranslationHandler.custom_font_path()
+ text = str(_('To be able to use this translation, please install a font manually that supports the language.')) + '\n'
+ text += str(_('The font should be stored as {}')).format(font_file)
+ return text
+ return None
+
+ choice = Menu(
+ _('Archinstall language'),
+ list(options.keys()),
+ default_option=preset_value.display_name,
+ preview_command=lambda x: dependency_preview(x),
+ preview_size=0.5
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Esc:
+ return preset_value
+ case MenuSelectionType.Selection:
+ language: Language = options[choice.value]
+ # we have to make sure that the proper AUR dependency is
+ # present to be able to use this language
+ if not language.external_dep or TranslationHandler.is_custom_font_enabled():
+ return language
+ return select_archinstall_language(languages, preset_value)
def select_profile(preset) -> Optional[Profile]:
@@ -147,19 +178,19 @@ def select_profile(preset) -> Optional[Profile]:
selection = Menu(
title=title,
p_options=list(options.keys()),
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match selection.type_:
case MenuSelectionType.Selection:
return options[selection.value] if selection.value is not None else None
case MenuSelectionType.Ctrl_c:
- archinstall.storage['profile_minimal'] = False
- archinstall.storage['_selected_servers'] = []
- archinstall.storage['_desktop_profile'] = None
- archinstall.arguments['desktop-environment'] = None
- archinstall.arguments['gfx_driver_packages'] = None
+ storage['profile_minimal'] = False
+ storage['_selected_servers'] = []
+ storage['_desktop_profile'] = None
+ storage['arguments']['desktop-environment'] = None
+ storage['arguments']['gfx_driver_packages'] = None
return None
case MenuSelectionType.Esc:
return None
@@ -172,27 +203,61 @@ def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List
def read_packages(already_defined: list = []) -> list:
display = ' '.join(already_defined)
- input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run()
- return input_packages.split(' ') if input_packages else []
+ input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run().strip()
+ return input_packages.split() if input_packages else []
pre_set_packages = pre_set_packages if pre_set_packages else []
packages = read_packages(pre_set_packages)
- while True:
- if len(packages):
- # Verify packages that were given
- print(_("Verifying that additional packages exist (this might take a few seconds)"))
- valid, invalid = validate_package_list(packages)
+ if not storage['arguments']['offline'] and not storage['arguments']['no_pkg_lookups']:
+ while True:
+ if len(packages):
+ # Verify packages that were given
+ print(_("Verifying that additional packages exist (this might take a few seconds)"))
+ valid, invalid = validate_package_list(packages)
- if invalid:
- log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red')
- packages = read_packages(valid)
- continue
- break
+ if invalid:
+ log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red')
+ packages = read_packages(valid)
+ continue
+ break
return packages
+def add_number_of_parrallel_downloads(input_number :Optional[int] = None) -> Optional[int]:
+ max_downloads = 5
+ print(_(f"This option enables the number of parallel downloads that can occur during installation"))
+ print(_(f"Enter the number of parallel downloads to be enabled.\n (Enter a value between 1 to {max_downloads})\nNote:"))
+ print(_(f" - Maximum value : {max_downloads} ( Allows {max_downloads} parallel downloads, allows {max_downloads+1} downloads at a time )"))
+ print(_(f" - Minimum value : 1 ( Allows 1 parallel download, allows 2 downloads at a time )"))
+ print(_(f" - Disable/Default : 0 ( Disables parallel downloading, allows only 1 download at a time )"))
+
+ while True:
+ try:
+ input_number = int(TextInput(_("[Default value: 0] > ")).run().strip() or 0)
+ if input_number <= 0:
+ input_number = 0
+ elif input_number > max_downloads:
+ input_number = max_downloads
+ break
+ except:
+ print(_(f"Invalid input! Try again with a valid input [1 to {max_downloads}, or 0 to disable]"))
+
+ pacman_conf_path = pathlib.Path("/etc/pacman.conf")
+ with pacman_conf_path.open() as f:
+ pacman_conf = f.read().split("\n")
+
+ with pacman_conf_path.open("w") as fwrite:
+ for line in pacman_conf:
+ if "ParallelDownloads" in line:
+ fwrite.write(f"ParallelDownloads = {input_number+1}\n") if not input_number == 0 else fwrite.write("#ParallelDownloads = 0\n")
+ else:
+ fwrite.write(f"{line}\n")
+
+ return input_number
+
+
def select_additional_repositories(preset: List[str]) -> List[str]:
"""
Allows the user to select additional repositories (multilib, and testing) if desired.
@@ -209,7 +274,7 @@ def select_additional_repositories(preset: List[str]) -> List[str]:
sort=False,
multi=True,
preset_values=preset,
- explode_on_interrupt=True
+ raise_error_on_interrupt=True
).run()
match choice.type_:
diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py
index 567a2964..84ce3556 100644
--- a/archinstall/lib/user_interaction/manage_users_conf.py
+++ b/archinstall/lib/user_interaction/manage_users_conf.py
@@ -7,6 +7,7 @@ from .utils import get_password
from ..menu import Menu
from ..menu.list_manager import ListManager
from ..models.users import User
+from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@@ -18,56 +19,51 @@ class UserList(ListManager):
"""
def __init__(self, prompt: str, lusers: List[User]):
- """
- param: prompt
- type: str
- param: lusers dict with the users already defined for the system
- type: Dict
- param: sudo. boolean to determine if we handle superusers or users. If None handles both types
- """
self._actions = [
str(_('Add a user')),
str(_('Change password')),
str(_('Promote/Demote user')),
str(_('Delete User'))
]
- super().__init__(prompt, lusers, self._actions, self._actions[0])
+ super().__init__(prompt, lusers, [self._actions[0]], self._actions[1:])
def reformat(self, data: List[User]) -> Dict[str, User]:
- return {e.display(): e for e in data}
+ table = FormattedOutput.as_table(data)
+ rows = table.split('\n')
- def action_list(self):
- active_user = self.target if self.target else None
+ # these are the header rows of the table and do not map to any User obviously
+ # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
+ # the selectable rows so the header has to be aligned
+ display_data = {f' {rows[0]}': None, f' {rows[1]}': None}
- if active_user is None:
- return [self._actions[0]]
- else:
- return self._actions[1:]
+ for row, user in zip(rows[2:], data):
+ row = row.replace('|', '\\|')
+ display_data[row] = user
- def exec_action(self, data: List[User]) -> List[User]:
- if self.target:
- active_user = self.target
- else:
- active_user = None
+ return display_data
- if self.action == self._actions[0]: # add
+ def selected_action_display(self, user: User) -> str:
+ return user.username
+
+ def handle_action(self, action: str, entry: Optional[User], data: List[User]) -> List[User]:
+ if action == self._actions[0]: # add
new_user = self._add_user()
if new_user is not None:
# in case a user with the same username as an existing user
# was created we'll replace the existing one
data = [d for d in data if d.username != new_user.username]
data += [new_user]
- elif self.action == self._actions[1]: # change password
- prompt = str(_('Password for user "{}": ').format(active_user.username))
+ elif action == self._actions[1]: # change password
+ prompt = str(_('Password for user "{}": ').format(entry.username))
new_password = get_password(prompt=prompt)
if new_password:
- user = next(filter(lambda x: x == active_user, data), 1)
+ user = next(filter(lambda x: x == entry, data))
user.password = new_password
- elif self.action == self._actions[2]: # promote/demote
- user = next(filter(lambda x: x == active_user, data), 1)
+ elif action == self._actions[2]: # promote/demote
+ user = next(filter(lambda x: x == entry, data))
user.sudo = False if user.sudo else True
- elif self.action == self._actions[3]: # delete
- data = [d for d in data if d != active_user]
+ elif action == self._actions[3]: # delete
+ data = [d for d in data if d != entry]
return data
@@ -77,8 +73,7 @@ class UserList(ListManager):
return False
def _add_user(self) -> Optional[User]:
- print(_('\nDefine a new user\n'))
- prompt = str(_('Enter username (leave blank to skip): '))
+ prompt = '\n\n' + str(_('Enter username (leave blank to skip): '))
while True:
username = input(prompt).strip(' ')
@@ -94,7 +89,9 @@ class UserList(ListManager):
choice = Menu(
str(_('Should "{}" be a superuser (sudo)?')).format(username), Menu.yes_no(),
skip=False,
- default_option=Menu.no()
+ default_option=Menu.no(),
+ clear_screen=False,
+ show_search_hint=False
).run()
sudo = True if choice.value == Menu.yes() else False
@@ -102,6 +99,5 @@ class UserList(ListManager):
def ask_for_additional_users(prompt: str = '', defined_users: List[User] = []) -> List[User]:
- prompt = prompt if prompt else _('Enter username (leave blank to skip): ')
users = UserList(prompt, defined_users).run()
return users
diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py
index 5154d8b1..557e8ed8 100644
--- a/archinstall/lib/user_interaction/network_conf.py
+++ b/archinstall/lib/user_interaction/network_conf.py
@@ -2,7 +2,7 @@ from __future__ import annotations
import ipaddress
import logging
-from typing import Any, Optional, TYPE_CHECKING, List, Union
+from typing import Any, Optional, TYPE_CHECKING, List, Union, Dict
from ..menu.menu import MenuSelectionType
from ..menu.text_input import TextInput
@@ -10,7 +10,7 @@ from ..models.network_configuration import NetworkConfiguration, NicType
from ..networking import list_interfaces
from ..menu import Menu
-from ..output import log
+from ..output import log, FormattedOutput
from ..menu.list_manager import ListManager
if TYPE_CHECKING:
@@ -19,55 +19,55 @@ if TYPE_CHECKING:
class ManualNetworkConfig(ListManager):
"""
- subclass of ListManager for the managing of network configuration accounts
+ subclass of ListManager for the managing of network configurations
"""
- def __init__(self, prompt: str, ifaces: Union[None, NetworkConfiguration, List[NetworkConfiguration]]):
- """
- param: prompt
- type: str
- param: ifaces already defined previously
- type: Dict
- """
+ def __init__(self, prompt: str, ifaces: List[NetworkConfiguration]):
+ self._actions = [
+ str(_('Add interface')),
+ str(_('Edit interface')),
+ str(_('Delete interface'))
+ ]
- if ifaces is not None and isinstance(ifaces, list):
- display_values = {iface.iface: iface for iface in ifaces}
- else:
- display_values = {}
+ super().__init__(prompt, ifaces, [self._actions[0]], self._actions[1:])
+
+ def reformat(self, data: List[NetworkConfiguration]) -> Dict[str, Optional[NetworkConfiguration]]:
+ table = FormattedOutput.as_table(data)
+ rows = table.split('\n')
- self._action_add = str(_('Add interface'))
- self._action_edit = str(_('Edit interface'))
- self._action_delete = str(_('Delete interface'))
+ # these are the header rows of the table and do not map to any User obviously
+ # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
+ # the selectable rows so the header has to be aligned
+ display_data: Dict[str, Optional[NetworkConfiguration]] = {f' {rows[0]}': None, f' {rows[1]}': None}
- self._iface_actions = [self._action_edit, self._action_delete]
+ for row, iface in zip(rows[2:], data):
+ row = row.replace('|', '\\|')
+ display_data[row] = iface
- super().__init__(prompt, display_values, self._iface_actions, self._action_add)
+ return display_data
- def run_manual(self) -> List[NetworkConfiguration]:
- ifaces = super().run()
- if ifaces is not None:
- return list(ifaces.values())
- return []
+ def selected_action_display(self, iface: NetworkConfiguration) -> str:
+ return iface.iface if iface.iface else ''
- def exec_action(self, data: Any):
- if self.action == self._action_add:
- iface_name = self._select_iface(data.keys())
+ def handle_action(self, action: str, entry: Optional[NetworkConfiguration], data: List[NetworkConfiguration]):
+ if action == self._actions[0]: # add
+ iface_name = self._select_iface(data)
if iface_name:
iface = NetworkConfiguration(NicType.MANUAL, iface=iface_name)
- data[iface_name] = self._edit_iface(iface)
- elif self.target:
- iface_name = list(self.target.keys())[0]
- iface = data[iface_name]
-
- if self.action == self._action_edit:
- data[iface_name] = self._edit_iface(iface)
- elif self.action == self._action_delete:
- del data[iface_name]
+ iface = self._edit_iface(iface)
+ data += [iface]
+ elif entry:
+ if action == self._actions[1]: # edit interface
+ data = [d for d in data if d.iface != entry.iface]
+ data.append(self._edit_iface(entry))
+ elif action == self._actions[2]: # delete
+ data = [d for d in data if d != entry]
return data
- def _select_iface(self, existing_ifaces: List[str]) -> Optional[Any]:
+ def _select_iface(self, data: List[NetworkConfiguration]) -> Optional[Any]:
all_ifaces = list_interfaces().values()
+ existing_ifaces = [d.iface for d in data]
available = set(all_ifaces) - set(existing_ifaces)
choice = Menu(str(_('Select interface to add')), list(available), skip=True).run()
@@ -76,7 +76,7 @@ class ManualNetworkConfig(ListManager):
return choice.value
- def _edit_iface(self, edit_iface :NetworkConfiguration):
+ def _edit_iface(self, edit_iface: NetworkConfiguration):
iface_name = edit_iface.iface
modes = ['DHCP (auto detect)', 'IP (static)']
default_mode = 'DHCP (auto detect)'
@@ -99,11 +99,13 @@ class ManualNetworkConfig(ListManager):
gateway = None
while 1:
- gateway_input = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '),
- edit_iface.gateway).run().strip()
+ gateway = TextInput(
+ _('Enter your gateway (router) IP address or leave blank for none: '),
+ edit_iface.gateway
+ ).run().strip()
try:
- if len(gateway_input) > 0:
- ipaddress.ip_address(gateway_input)
+ if len(gateway) > 0:
+ ipaddress.ip_address(gateway)
break
except ValueError:
log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red')
@@ -124,7 +126,9 @@ class ManualNetworkConfig(ListManager):
return NetworkConfiguration(NicType.MANUAL, iface=iface_name)
-def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[NetworkConfiguration]]) -> Optional[Union[List[NetworkConfiguration], NetworkConfiguration]]:
+def ask_to_configure_network(
+ preset: Union[NetworkConfiguration, List[NetworkConfiguration]]
+) -> Optional[NetworkConfiguration | List[NetworkConfiguration]]:
"""
Configure the network on the newly installed system
"""
@@ -150,8 +154,8 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw
list(network_options.values()),
cursor_index=cursor_idx,
sort=False,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match choice.type_:
@@ -165,7 +169,7 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw
elif choice.value == network_options['network_manager']:
return NetworkConfiguration(NicType.NM)
elif choice.value == network_options['manual']:
- manual = ManualNetworkConfig('Configure interfaces', preset)
- return manual.run_manual()
+ preset_ifaces = preset if isinstance(preset, list) else []
+ return ManualNetworkConfig('Configure interfaces', preset_ifaces).run()
return preset
diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py
index bfff5705..f2e6b881 100644
--- a/archinstall/lib/user_interaction/partitioning_conf.py
+++ b/archinstall/lib/user_interaction/partitioning_conf.py
@@ -5,7 +5,7 @@ from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable, Optional
from ..menu import Menu
from ..menu.menu import MenuSelectionType
-from ..output import log
+from ..output import log, FormattedOutput
from ..disk.validators import fs_types
@@ -28,16 +28,31 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool =
pad_right = spaces - pad_left
return f'{pad_right * " "}{name}{pad_left * " "}|'
+ def flatten_data(data: Dict[str, Any]) -> Dict[str, Any]:
+ flattened = {}
+ for k, v in data.items():
+ if k == 'filesystem':
+ flat = flatten_data(v)
+ flattened.update(flat)
+ elif k == 'btrfs':
+ # we're going to create a separate table for the btrfs subvolumes
+ pass
+ else:
+ flattened[k] = v
+ return flattened
+
+ display_data: List[Dict[str, Any]] = [flatten_data(entry) for entry in partitions]
+
column_names = {}
# this will add an initial index to the table for each partition
if with_idx:
- column_names['index'] = max([len(str(len(partitions))), len('index')])
+ column_names['index'] = max([len(str(len(display_data))), len('index')])
# determine all attribute names and the max length
- # of the value among all partitions to know the width
+ # of the value among all display_data to know the width
# of the table cells
- for p in partitions:
+ for p in display_data:
for attribute, value in p.items():
if attribute in column_names.keys():
column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)])
@@ -50,7 +65,7 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool =
current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n'
- for idx, p in enumerate(partitions):
+ for idx, p in enumerate(display_data):
row = ''
for name, max_len in column_names.items():
if name == 'index':
@@ -62,6 +77,13 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool =
current_layout += f'{row[:-1]}\n'
+ # we'll create a separate table for the btrfs subvolumes
+ btrfs_subvolumes = [partition['btrfs']['subvolumes'] for partition in partitions if partition.get('btrfs', None)]
+ if len(btrfs_subvolumes) > 0:
+ for subvolumes in btrfs_subvolumes:
+ output = FormattedOutput.as_table(subvolumes)
+ current_layout += f'\n{output}'
+
if with_title:
title = str(_('Current partition layout'))
return f'\n\n{title}:\n\n{current_layout}'
@@ -118,23 +140,10 @@ def get_default_partition_layout(
return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
-def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
- result = {}
-
- for device in block_devices:
- layout = manage_new_and_existing_partitions(device)
- result[device.path] = layout
-
- return result
-
-
def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50
block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]}
original_layout = copy.deepcopy(block_device_struct)
- # Test code: [part.__dump__() for part in block_device.partitions.values()]
- # TODO: Squeeze in BTRFS subvolumes here
-
new_partition = str(_('Create a new partition'))
suggest_partition_layout = str(_('Suggest partition layout'))
delete_partition = str(_('Delete a partition'))
@@ -187,6 +196,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
return original_layout
elif task == save_and_exit:
break
+
if task == new_partition:
from ..disk import valid_parted_position
@@ -200,8 +210,9 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if fs_choice.type_ == MenuSelectionType.Esc:
continue
- prompt = _('Enter the start sector (percentage or block number, default: {}): ').format(
- block_device.first_free_sector)
+ prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format(
+ block_device.first_free_sector
+ )
start = input(prompt).strip()
if not start.strip():
@@ -210,8 +221,9 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
else:
end_suggested = '100%'
- prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format(
- end_suggested)
+ prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format(
+ end_suggested
+ )
end = input(prompt).strip()
if not end.strip():
@@ -224,7 +236,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
continue
block_device_struct["partitions"].append({
- "type": "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject
+ "type": "primary", # Strictly only allowed under MS-DOS, but GPT accepts it so it's "safe" to inject
"start": start,
"size": end,
"mountpoint": None,
@@ -351,18 +363,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if partition is not None:
if not block_device_struct["partitions"][partition].get('btrfs', {}):
block_device_struct["partitions"][partition]['btrfs'] = {}
- if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', {}):
- block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = {}
+ if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []):
+ block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = []
prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes']
- result = SubvolumeList(_("Manage btrfs subvolumes for current partition"),prev).run()
- if result:
- block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
- else:
- del block_device_struct["partitions"][partition]['btrfs']
+ result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run()
+ block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
return block_device_struct
+
def select_encrypted_partitions(
title :str,
partitions :List[Partition],
@@ -374,11 +384,9 @@ def select_encrypted_partitions(
if len(partition_indexes) == 0:
return None
- title = _('Select which partitions to mark for formatting:')
-
# show current partition layout:
if len(partitions):
- title += current_partition_layout(partitions) + '\n'
+ title += current_partition_layout(partitions, with_idx=True) + '\n'
choice = Menu(title, partition_indexes, multi=multiple).run()
diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py
index af783639..94150dee 100644
--- a/archinstall/lib/user_interaction/subvolume_config.py
+++ b/archinstall/lib/user_interaction/subvolume_config.py
@@ -1,146 +1,98 @@
-from typing import Dict, List
+from typing import Dict, List, Optional, Any, TYPE_CHECKING
from ..menu.list_manager import ListManager
from ..menu.menu import MenuSelectionType
-from ..menu.selection_menu import Selector, GeneralMenu
from ..menu.text_input import TextInput
from ..menu import Menu
+from ..models.subvolume import Subvolume
+from ... import FormattedOutput
+
+if TYPE_CHECKING:
+ _: Any
-"""
-UI classes
-"""
class SubvolumeList(ListManager):
- def __init__(self,prompt,list):
- self.ObjectNullAction = None # str(_('Add'))
- self.ObjectDefaultAction = str(_('Add'))
- super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction)
-
- def reformat(self, data: Dict) -> Dict:
- def presentation(key :str, value :Dict):
- text = _(" Subvolume :{:16}").format(key)
- if isinstance(value,str):
- text += _(" mounted at {:16}").format(value)
- else:
- if value.get('mountpoint'):
- text += _(" mounted at {:16}").format(value['mountpoint'])
- else:
- text += (' ' * 28)
-
- if value.get('options',[]):
- text += _(" with option {}").format(', '.join(value['options']))
- return text
-
- formatted = {presentation(k, v): k for k, v in data.items()}
- return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])}
-
- def action_list(self):
- return super().action_list()
-
- def exec_action(self, data: Dict):
- if self.target:
- origkey, origval = list(self.target.items())[0]
- else:
- origkey = None
-
- if self.action == str(_('Delete')):
- del data[origkey]
- else:
- if self.action == str(_('Add')):
- self.target = {}
- print(_('\n Fill the desired values for a new subvolume \n'))
- with SubvolumeMenu(self.target,self.action) as add_menu:
- for elem in ['name','mountpoint','options']:
- add_menu.exec_option(elem)
- else:
- SubvolumeMenu(self.target,self.action).run()
-
- data.update(self.target)
+ def __init__(self, prompt: str, subvolumes: List[Subvolume]):
+ self._actions = [
+ str(_('Add subvolume')),
+ str(_('Edit subvolume')),
+ str(_('Delete subvolume'))
+ ]
+ super().__init__(prompt, subvolumes, [self._actions[0]], self._actions[1:])
- return data
+ def reformat(self, data: List[Subvolume]) -> Dict[str, Optional[Subvolume]]:
+ table = FormattedOutput.as_table(data)
+ rows = table.split('\n')
+
+ # these are the header rows of the table and do not map to any User obviously
+ # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
+ # the selectable rows so the header has to be aligned
+ display_data: Dict[str, Optional[Subvolume]] = {f' {rows[0]}': None, f' {rows[1]}': None}
+
+ for row, subvol in zip(rows[2:], data):
+ row = row.replace('|', '\\|')
+ display_data[row] = subvol
+
+ return display_data
+ def selected_action_display(self, subvolume: Subvolume) -> str:
+ return subvolume.name
+
+ def _prompt_options(self, editing: Optional[Subvolume] = None) -> List[str]:
+ preset_options = []
+ if editing:
+ preset_options = editing.options
-class SubvolumeMenu(GeneralMenu):
- def __init__(self,parameters,action=None):
- self.data = parameters
- self.action = action
- self.ds = {}
- self.ds['name'] = None
- self.ds['mountpoint'] = None
- self.ds['options'] = None
- if self.data:
- origkey,origval = list(self.data.items())[0]
- self.ds['name'] = origkey
- if isinstance(origval,str):
- self.ds['mountpoint'] = origval
- else:
- self.ds['mountpoint'] = self.data[origkey].get('mountpoint')
- self.ds['options'] = self.data[origkey].get('options')
-
- super().__init__(data_store=self.ds)
-
- def _setup_selection_menu_options(self):
- # [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))]
- self._menu_options['name'] = Selector(str(_('Subvolume name ')),
- self._select_subvolume_name if not self.action or self.action in (str(_('Add')),str(_('Copy'))) else None,
- mandatory=True,
- enabled=True)
- self._menu_options['mountpoint'] = Selector(str(_('Subvolume mountpoint')),
- self._select_subvolume_mount_point if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
- enabled=True)
- self._menu_options['options'] = Selector(str(_('Subvolume options')),
- self._select_subvolume_options if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
- enabled=True)
- self._menu_options['save'] = Selector(str(_('Save')),
- exec_func=lambda n,v:True,
- enabled=True)
- self._menu_options['cancel'] = Selector(str(_('Cancel')),
- # func = lambda pre:True,
- exec_func=lambda n,v:self.fast_exit(n),
- enabled=True)
- self.cancel_action = 'cancel'
- self.save_action = 'save'
- self.bottom_list = [self.save_action,self.cancel_action]
-
- def fast_exit(self,accion):
- if self.option(accion).get_selection():
- for item in self.list_options():
- if self.option(item).is_mandatory():
- self.option(item).set_mandatory(False)
- return True
-
- def exit_callback(self):
- # we exit without moving data
- if self.option(self.cancel_action).get_selection():
- return
- if not self.ds['name']:
- return
- else:
- key = self.ds['name']
- value = {}
- if self.ds['mountpoint']:
- value['mountpoint'] = self.ds['mountpoint']
- if self.ds['options']:
- value['options'] = self.ds['options']
- self.data.update({key : value})
-
- def _select_subvolume_name(self,value):
- return TextInput(str(_("Subvolume name :")),value).run()
-
- def _select_subvolume_mount_point(self,value):
- return TextInput(str(_("Select a mount point :")),value).run()
-
- def _select_subvolume_options(self,value) -> List[str]:
- # def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True):
choice = Menu(
str(_("Select the desired subvolume options ")),
['nodatacow','compress'],
skip=True,
- preset_values=value,
+ preset_values=preset_options,
multi=True
).run()
if choice.type_ == MenuSelectionType.Selection:
- return choice.value
+ return choice.value # type: ignore
return []
+
+ def _add_subvolume(self, editing: Optional[Subvolume] = None) -> Optional[Subvolume]:
+ name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run()
+
+ if not name:
+ return None
+
+ mountpoint = TextInput(f'\n{_("Subvolume mountpoint")}: ', editing.mountpoint if editing else '').run()
+
+ if not mountpoint:
+ return None
+
+ options = self._prompt_options(editing)
+
+ subvolume = Subvolume(name, mountpoint)
+ subvolume.compress = 'compress' in options
+ subvolume.nodatacow = 'nodatacow' in options
+
+ return subvolume
+
+ def handle_action(self, action: str, entry: Optional[Subvolume], data: List[Subvolume]) -> List[Subvolume]:
+ if action == self._actions[0]: # add
+ new_subvolume = self._add_subvolume()
+
+ if new_subvolume is not None:
+ # in case a user with the same username as an existing user
+ # was created we'll replace the existing one
+ data = [d for d in data if d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif entry is not None:
+ if action == self._actions[1]: # edit subvolume
+ new_subvolume = self._add_subvolume(entry)
+
+ if new_subvolume is not None:
+ # we'll remove the original subvolume and add the modified version
+ data = [d for d in data if d.name != entry.name and d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif action == self._actions[2]: # delete
+ data = [d for d in data if d != entry]
+
+ return data
diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py
index 78daa6a5..94bbac30 100644
--- a/archinstall/lib/user_interaction/system_conf.py
+++ b/archinstall/lib/user_interaction/system_conf.py
@@ -32,8 +32,8 @@ def select_kernel(preset: List[str] = None) -> List[str]:
sort=True,
multi=True,
preset_values=preset,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match choice.type_:
@@ -67,8 +67,8 @@ def select_harddrives(preset: List[str] = []) -> List[str]:
list(options.keys()),
preset_values=list(preset_disks.keys()),
multi=True,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match selected_harddrive.type_:
diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/user_interaction/utils.py
index fa079bc2..7ee6fc07 100644
--- a/archinstall/lib/user_interaction/utils.py
+++ b/archinstall/lib/user_interaction/utils.py
@@ -7,6 +7,7 @@ import time
from typing import Any, Optional, TYPE_CHECKING
from ..menu import Menu
+from ..models.password_strength import PasswordStrength
from ..output import log
if TYPE_CHECKING:
@@ -16,42 +17,23 @@ if TYPE_CHECKING:
SIG_TRIGGER = None
-def check_password_strong(passwd: str) -> bool:
- symbol_count = 0
- if any(character.isdigit() for character in passwd):
- symbol_count += 10
- if any(character.isupper() for character in passwd):
- symbol_count += 26
- if any(character.islower() for character in passwd):
- symbol_count += 26
- if any(not character.isalnum() for character in passwd):
- symbol_count += 40
-
- if symbol_count**len(passwd) < 10e20:
- prompt = str(_("The password you are using seems to be weak, are you sure you want to use it?"))
- choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run()
- return choice.value == Menu.yes()
-
- return True
-
-
def get_password(prompt: str = '') -> Optional[str]:
if not prompt:
prompt = _("Enter a password: ")
- while passwd := getpass.getpass(prompt):
- if len(passwd.strip()) <= 0:
+ while password := getpass.getpass(prompt):
+ if len(password.strip()) <= 0:
break
- if not check_password_strong(passwd):
- continue
+ strength = PasswordStrength.strength(password)
+ log(f'Password strength: {strength.value}', fg=strength.color())
passwd_verification = getpass.getpass(prompt=_('And one more time for verification: '))
- if passwd != passwd_verification:
+ if password != passwd_verification:
log(' * Passwords did not match * ', fg='red')
continue
- return passwd
+ return password
return None