Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2021-03-09 23:39:11 +0100
committerAnton Hvornum <anton@hvornum.se>2021-03-09 23:39:11 +0100
commite2aeb3a32faa397db899f4105a16f31a095387be (patch)
tree007129425274e4efee38622c8cf5f912074a5531 /archinstall
parent48e9f13f0105e449c95e52672d1bf98e6e6b5d65 (diff)
parentcfbaaedb17e9ad6975a5bb37095e2b4ebe8e6797 (diff)
Mergining in PR #112. Reworked partitioning and added new functions
Diffstat (limited to 'archinstall')
-rw-r--r--archinstall/__init__.py17
-rw-r--r--archinstall/lib/disk.py280
-rw-r--r--archinstall/lib/exceptions.py6
-rw-r--r--archinstall/lib/general.py10
-rw-r--r--archinstall/lib/installer.py2
-rw-r--r--archinstall/lib/luks.py60
-rw-r--r--archinstall/lib/output.py11
-rw-r--r--archinstall/lib/profiles.py17
-rw-r--r--archinstall/lib/storage.py3
-rw-r--r--archinstall/lib/user_interaction.py122
10 files changed, 447 insertions, 81 deletions
diff --git a/archinstall/__init__.py b/archinstall/__init__.py
index ee2d0361..d4452d38 100644
--- a/archinstall/__init__.py
+++ b/archinstall/__init__.py
@@ -12,4 +12,19 @@ from .lib.services import *
from .lib.packages import *
from .lib.output import *
from .lib.storage import *
-from .lib.hardware import * \ No newline at end of file
+from .lib.hardware import *
+
+## Basic version of arg.parse() supporting:
+## --key=value
+## --boolean
+arguments = {}
+positionals = []
+for arg in sys.argv[1:]:
+ if '--' == arg[:2]:
+ if '=' in arg:
+ key, val = [x.strip() for x in arg[2:].split('=', 1)]
+ else:
+ key, val = arg[2:], True
+ arguments[key] = val
+ else:
+ positionals.append(arg) \ No newline at end of file
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index caf5c4e1..c05ba757 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -1,4 +1,5 @@
-import glob, re, os, json, time # Time is only used to gracefully wait for new paritions to come online
+import glob, re, os, json, time, hashlib
+import pathlib
from collections import OrderedDict
from .exceptions import DiskError
from .general import *
@@ -7,6 +8,7 @@ from .storage import storage
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GPT = 0b00000001
+MBR = 0b00000010
#import ctypes
#import ctypes.util
@@ -14,14 +16,27 @@ GPT = 0b00000001
#libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
class BlockDevice():
- def __init__(self, path, info):
+ def __init__(self, path, info=None):
+ if not info:
+ # If we don't give any information, we need to auto-fill it.
+ # Otherwise any subsequent usage will break.
+ info = all_disks()[path].info
+
self.path = path
self.info = info
self.part_cache = OrderedDict()
+ # TODO: Currently disk encryption is a BIT missleading.
+ # It's actually partition-encryption, but for future-proofing this
+ # I'm placing the encryption password on a BlockDevice level.
+ self.encryption_passwoed = None
def __repr__(self, *args, **kwargs):
return f"BlockDevice({self.device})"
+ def __iter__(self):
+ for partition in self.partitions:
+ yield self.partitions[partition]
+
def __getitem__(self, key, *args, **kwargs):
if key not in self.info:
raise KeyError(f'{self} does not contain information: "{key}"')
@@ -91,7 +106,8 @@ class BlockDevice():
part_id = part['name'][len(os.path.basename(self.path)):]
if part_id not in self.part_cache:
## TODO: Force over-write even if in cache?
- self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size'])
+ if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']:
+ self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size'])
return {k: self.part_cache[k] for k in sorted(self.part_cache)}
@@ -100,50 +116,177 @@ class BlockDevice():
all_partitions = self.partitions
return [all_partitions[k] for k in all_partitions]
+ @property
+ def partition_table_type(self):
+ return GPT
+
+ def has_partitions(self):
+ return len(self.partitions)
+
+ def has_mount_point(self, mountpoint):
+ for partition in self.partitions:
+ if self.partitions[partition].mountpoint == mountpoint:
+ return True
+ return False
class Partition():
- def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False):
+ def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
if not part_id:
part_id = os.path.basename(path)
self.path = path
self.part_id = part_id
self.mountpoint = mountpoint
- self.filesystem = filesystem # TODO: Autodetect if we're reusing a partition
+ self.target_mountpoint = mountpoint
+ self.filesystem = filesystem
self.size = size # TODO: Refresh?
self.encrypted = encrypted
+ self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions.
+
+ if mountpoint:
+ self.mount(mountpoint)
+
+ mount_information = get_mount_info(self.path)
+
+ if self.mountpoint != mount_information.get('target', None) and mountpoint:
+ raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
+
+ if (target := mount_information.get('target', None)):
+ self.mountpoint = target
+
+ if not self.filesystem and autodetect_filesystem:
+ if (fstype := mount_information.get('fstype', get_filesystem_type(self.real_device))):
+ self.filesystem = fstype
+
+ if self.filesystem == 'crypto_LUKS':
+ self.encrypted = True
+
+ def __lt__(self, left_comparitor):
+ if type(left_comparitor) == Partition:
+ left_comparitor = left_comparitor.path
+ else:
+ left_comparitor = str(left_comparitor)
+ return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
def __repr__(self, *args, **kwargs):
+ mount_repr = ''
+ if self.mountpoint:
+ mount_repr = f", mounted={self.mountpoint}"
+ elif self.target_mountpoint:
+ mount_repr = f", rel_mountpoint={self.target_mountpoint}"
+
if self.encrypted:
- return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}, mounted={self.mountpoint})'
+ return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
+ else:
+ return f'Partition(path={self.path}, fs={self.filesystem}{mount_repr})'
+
+ @property
+ def real_device(self):
+ if not self.encrypted:
+ return self.path
else:
- return f'Partition(path={self.path}, fs={self.filesystem}, mounted={self.mountpoint})'
+ for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
+ if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
+ return f"/dev/{parent}"
+ raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
+
+ def detect_inner_filesystem(self, password):
+ log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=LOG_LEVELS.Info)
+ from .luks import luks2
+ with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device:
+ return unlocked_device.filesystem
+
+ def has_content(self):
+ temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest()
+ temporary_path = pathlib.Path(temporary_mountpoint)
+
+ temporary_path.mkdir(parents=True, exist_ok=True)
+ if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
+ raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
+
+ files = len(glob.glob(f"{temporary_mountpoint}/*"))
+ sys_command(f'/usr/bin/umount {temporary_mountpoint}')
+
+ temporary_path.rmdir()
+
+ return True if files > 0 else False
+
+ def safe_to_format(self):
+ if self.allow_formatting is False:
+ return False
+ elif self.target_mountpoint == '/boot' and self.has_content():
+ return False
+
+ return True
+
+ def encrypt(self, *args, **kwargs):
+ """
+ A wrapper function for luks2() instances and the .encrypt() method of that instance.
+ """
+ from .luks import luks2
+
+ if not self.encrypted:
+ raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}")
+
+ if not self.safe_to_format():
+ return False
+
+ handle = luks2(self, None, None)
+ return handle.encrypt(self, *args, **kwargs)
+
+ def format(self, filesystem=None, path=None, allow_formatting=None, log_formating=True):
+ """
+ Format can be given an overriding path, for instance /dev/null to test
+ the formating functionality and in essence the support for the given filesystem.
+ """
+ if filesystem is None:
+ filesystem = self.filesystem
+
+ if path is None:
+ path = self.path
+ if allow_formatting is None:
+ allow_formatting = self.allow_formatting
+
+ if not allow_formatting:
+ raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
+
+ if log_formating:
+ log(f'Formatting {path} -> {filesystem}', level=LOG_LEVELS.Info)
- def format(self, filesystem):
- log(f'Formatting {self} -> {filesystem}', level=LOG_LEVELS.Info)
if filesystem == 'btrfs':
- o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {self.path}'))
+ o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}'))
if b'UUID' not in o:
- raise DiskError(f'Could not format {self.path} with {filesystem} because: {o}')
+ raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
self.filesystem = 'btrfs'
- elif filesystem == 'fat32':
- o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {self.path}'))
+
+ elif filesystem == 'vfat':
+ o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {path}'))
if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
- raise DiskError(f'Could not format {self.path} with {filesystem} because: {o}')
- self.filesystem = 'fat32'
+ raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
+ self.filesystem = 'vfat'
+
elif filesystem == 'ext4':
- if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {self.path}')).exit_code != 0:
- raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}')
+ if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
+ raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'ext4'
+
elif filesystem == 'xfs':
- if (handle:= sys_command(f'/usr/bin/mkfs.xfs -f {self.path}')).exit_code != 0:
- raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}')
+ if (handle := sys_command(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
+ raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'xfs'
+
elif filesystem == 'f2fs':
- if (handle:= sys_command(f'/usr/bin/mkfs.f2fs -f {self.path}')).exit_code != 0:
- raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}')
+ if (handle := sys_command(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
+ raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'f2fs'
+
+ elif filesystem == 'crypto_LUKS':
+ # from .luks import luks2
+ # encrypted_partition = luks2(self, None, None)
+ # encrypted_partition.format(path)
+ self.filesystem = 'crypto_LUKS'
+
else:
- raise DiskError(f'Fileformat {filesystem} is not yet implemented.')
+ raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
return True
def find_parent_of(self, data, name, parent=None):
@@ -154,16 +297,6 @@ class Partition():
if (parent := self.find_parent_of(child, name, parent=data['name'])):
return parent
- @property
- def real_device(self):
- if not self.encrypted:
- return self.path
- else:
- for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
- if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
- return f"/dev/{parent}"
- raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
-
def mount(self, target, fs=None, options=''):
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=LOG_LEVELS.Info)
@@ -179,6 +312,21 @@ class Partition():
self.mountpoint = target
return True
+ def filesystem_supported(self):
+ """
+ The support for a filesystem (this partition) is tested by calling
+ partition.format() with a path set to '/dev/null' which returns two exceptions:
+ 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
+ 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
+ """
+ try:
+ self.format(self.filesystem, '/dev/null', log_formating=False, allow_formatting=True)
+ except SysCallError:
+ pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code
+ except UnknownFilesystemFormat as err:
+ raise err
+ return True
+
class Filesystem():
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
@@ -188,13 +336,23 @@ class Filesystem():
self.mode = mode
def __enter__(self, *args, **kwargs):
- if self.mode == GPT:
- if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0:
- return self
+ if self.blockdevice.keep_partitions is False:
+ log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=LOG_LEVELS.Debug)
+ if self.mode == GPT:
+ if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0:
+ return self
+ else:
+ raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
else:
- raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
+ raise DiskError(f'Unknown mode selected to format in: {self.mode}')
+
+ # TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed.
+ elif self.mode == self.blockdevice.partition_table_type:
+ log(f'Kept partition format {self.mode} for {self.blockdevice}', level=LOG_LEVELS.Debug)
else:
- raise DiskError(f'Unknown mode selected to format in: {self.mode}')
+ raise DiskError(f'The selected partition table format {self.mode} does not match that of {self.blockdevice}.')
+
+ return self
def __repr__(self):
return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
@@ -206,6 +364,11 @@ class Filesystem():
b''.join(sys_command(f'sync'))
return True
+ def find_partition(self, mountpoint):
+ for partition in self.blockdevice:
+ if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
+ return partition
+
def raw_parted(self, string:str):
x = sys_command(f'/usr/bin/parted -s {string}')
o = b''.join(x)
@@ -220,15 +383,23 @@ class Filesystem():
"""
return self.raw_parted(string).exit_code
- def use_entire_disk(self, prep_mode=None):
- self.add_partition('primary', start='1MiB', end='513MiB', format='fat32')
+ def use_entire_disk(self, root_filesystem_type='ext4', encrypt_root_partition=True):
+ self.add_partition('primary', start='1MiB', end='513MiB', format='vfat')
self.set_name(0, 'EFI')
self.set(0, 'boot on')
- self.set(0, 'esp on') # TODO: Redundant, as in GPT mode it's an alias for "boot on"? https://www.gnu.org/software/parted/manual/html_node/set.html
- if prep_mode == 'luks2':
- self.add_partition('primary', start='513MiB', end='100%')
- else:
- self.add_partition('primary', start='513MiB', end='100%', format='ext4')
+ # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
+ # https://www.gnu.org/software/parted/manual/html_node/set.html
+ self.set(0, 'esp on')
+ self.add_partition('primary', start='513MiB', end='100%')
+
+ self.blockdevice.partition[0].filesystem = 'vfat'
+ self.blockdevice.partition[1].filesystem = root_filesystem_type
+
+ self.blockdevice.partition[0].target_mountpoint = '/boot'
+ self.blockdevice.partition[1].target_mountpoint = '/'
+
+ if encrypt_root_partition:
+ self.blockdevice.partition[1].encrypted = True
def add_partition(self, type, start, end, format=None):
log(f'Adding partition to {self.blockdevice}', level=LOG_LEVELS.Info)
@@ -302,3 +473,24 @@ def harddrive(size=None, model=None, fuzzy=False):
continue
return collection[drive]
+
+def get_mount_info(path):
+ try:
+ output = b''.join(sys_command(f'/usr/bin/findmnt --json {path}'))
+ except SysCallError:
+ return {}
+
+ output = output.decode('UTF-8')
+ output = json.loads(output)
+ if 'filesystems' in output:
+ if len(output['filesystems']) > 1:
+ raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
+
+ return output['filesystems'][0]
+
+def get_filesystem_type(path):
+ try:
+ handle = sys_command(f"blkid -o value -s TYPE {path}")
+ return b''.join(handle).strip().decode('UTF-8')
+ except SysCallError:
+ return None \ No newline at end of file
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index 84e6a766..5a5d47c6 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -2,6 +2,8 @@ class RequirementError(BaseException):
pass
class DiskError(BaseException):
pass
+class UnknownFilesystemFormat(BaseException):
+ pass
class ProfileError(BaseException):
pass
class SysCallError(BaseException):
@@ -9,4 +11,8 @@ class SysCallError(BaseException):
class ProfileNotFound(BaseException):
pass
class HardwareIncompatibilityError(BaseException):
+ pass
+class PermissionError(BaseException):
+ pass
+class UserError(BaseException):
pass \ No newline at end of file
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index dc94b063..e87e4102 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -37,6 +37,7 @@ class JSON_Encoder:
## We'll need to iterate not just the value that default() usually gets passed
## But also iterate manually over each key: value pair in order to trap the keys.
+ copy = {}
for key, val in list(obj.items()):
if isinstance(val, dict):
val = json.loads(json.dumps(val, cls=JSON)) # This, is a EXTREMELY ugly hack..
@@ -44,9 +45,12 @@ class JSON_Encoder:
# trigger a encoding of sub-dictionaries.
else:
val = JSON_Encoder._encode(val)
- del(obj[key])
- obj[JSON_Encoder._encode(key)] = val
- return obj
+
+ if type(key) == str and key[0] == '!':
+ copy[JSON_Encoder._encode(key)] = '******'
+ else:
+ copy[JSON_Encoder._encode(key)] = val
+ return copy
elif hasattr(obj, 'json'):
return obj.json()
elif hasattr(obj, '__dump__'):
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 4fb6b706..06bdd05a 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -78,7 +78,7 @@ class Installer():
if len(args) >= 2 and args[1]:
#self.log(self.trace_log.decode('UTF-8'), level=LOG_LEVELS.Debug)
- self.log(args[1], level=LOG_LEVELS.Error)
+ self.log(args[1], level=LOG_LEVELS.Error, fg='red')
self.sync_log_to_install_medium()
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index e1f14bab..e54641b8 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -6,31 +6,60 @@ from .output import log, LOG_LEVELS
from .storage import storage
class luks2():
- def __init__(self, partition, mountpoint, password, *args, **kwargs):
+ def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
self.password = password
self.partition = partition
self.mountpoint = mountpoint
self.args = args
self.kwargs = kwargs
+ self.key_file = key_file
+ self.auto_unmount = auto_unmount
+ self.filesystem = 'crypto_LUKS'
+ self.mapdev = None
def __enter__(self):
- key_file = self.encrypt(self.partition, self.password, *self.args, **self.kwargs)
- return self.unlock(self.partition, self.mountpoint, key_file)
+ #if self.partition.allow_formatting:
+ # self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
+ #else:
+ if not self.key_file:
+ self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
+
+ if type(self.password) != bytes:
+ self.password = bytes(self.password, 'UTF-8')
+
+ with open(self.key_file, 'wb') as fh:
+ fh.write(self.password)
+
+ return self.unlock(self.partition, self.mountpoint, self.key_file)
def __exit__(self, *args, **kwargs):
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
+ if self.auto_unmount:
+ self.close()
+
if len(args) >= 2 and args[1]:
raise args[1]
return True
- def encrypt(self, partition, password, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
+ def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
# TODO: We should be able to integrate this into the main log some how.
# Perhaps post-mortem?
+ if not self.partition.allow_formatting:
+ raise DiskError(f'Could not encrypt volume {self.partition} due to it having a formatting lock.')
+
log(f'Encrypting {partition} (This might take a while)', level=LOG_LEVELS.Info)
if not key_file:
- key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
- if type(password) != bytes: password = bytes(password, 'UTF-8')
+ if self.key_file:
+ key_file = self.key_file
+ else:
+ key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
+
+ if not password:
+ password = self.password
+
+ if type(password) != bytes:
+ password = bytes(password, 'UTF-8')
with open(key_file, 'wb') as fh:
fh.write(password)
@@ -49,12 +78,23 @@ class luks2():
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
:type mountpoint: str
"""
+ from .disk import get_filesystem_type
if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead?
sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
if os.path.islink(f'/dev/mapper/{mountpoint}'):
- return Partition(f'/dev/mapper/{mountpoint}', encrypted=True)
+ self.mapdev = f'/dev/mapper/{mountpoint}'
+ unlocked_partition = Partition(self.mapdev, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
+ unlocked_partition.allow_formatting = self.partition.allow_formatting
+ return unlocked_partition
+
+ def close(self, mountpoint=None):
+ if not mountpoint:
+ mountpoint = self.mapdev
+
+ sys_command(f'/usr/bin/cryptsetup close {self.mapdev}')
+ return os.path.islink(self.mapdev) is False
- def close(self, mountpoint):
- sys_command(f'cryptsetup close /dev/mapper/{mountpoint}')
- return os.path.islink(f'/dev/mapper/{mountpoint}') is False \ No newline at end of file
+ def format(self, path):
+ if (handle := sys_command(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
+ raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') \ No newline at end of file
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index 956ad0c4..0e0a295b 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -5,6 +5,9 @@ import logging
from pathlib import Path
from .storage import storage
+# TODO: use logging's built in levels instead.
+# Altough logging is threaded and I wish to avoid that.
+# It's more Pythonistic or w/e you want to call it.
class LOG_LEVELS:
Critical = 0b001
Error = 0b010
@@ -108,10 +111,10 @@ def log(*args, **kwargs):
# In that case, we'll drop it.
return None
- try:
- journald.log(string, level=kwargs['level'])
- except ModuleNotFoundError:
- pass # Ignore writing to journald
+ try:
+ journald.log(string, level=kwargs.get('level', LOG_LEVELS.Info))
+ except ModuleNotFoundError:
+ pass # Ignore writing to journald
# Finally, print the log unless we skipped it based on level.
# We use sys.stdout.write()+flush() instead of print() to try and
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index f9aa206c..01c3288c 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -157,6 +157,23 @@ class Profile(Script):
def install(self):
return self.execute()
+ def has_prep_function(self):
+ with open(self.path, 'r') as source:
+ source_data = source.read()
+
+ # Some crude safety checks, make sure the imported profile has
+ # a __name__ check and if so, check if it's got a _prep_function()
+ # we can call to ask for more user input.
+ #
+ # If the requirements are met, import with .py in the namespace to not
+ # trigger a traditional:
+ # if __name__ == 'moduleName'
+ if '__name__' in source_data and '_prep_function' in source_data:
+ with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
+ if hasattr(imported, '_prep_function'):
+ return True
+ return False
+
class Application(Profile):
def __repr__(self, *args, **kwargs):
return f'Application({os.path.basename(self.profile)})'
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index e881700f..9bda017d 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -17,5 +17,6 @@ storage = {
'UPSTREAM_URL' : 'https://raw.githubusercontent.com/Torxed/archinstall/master/profiles',
'PROFILE_DB' : None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
'LOG_PATH' : '/var/log/archinstall',
- 'LOG_FILE' : 'install.log'
+ 'LOG_FILE' : 'install.log',
+ 'MOUNT_POINT' : '/mnt'
}
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index fdbabe96..7e7f5873 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -1,10 +1,114 @@
+import getpass
from .exceptions import *
from .profiles import Profile
from .locale_helpers import search_keyboard_layout
+from .output import log, LOG_LEVELS
+from .storage import storage
+from .networking import list_interfaces
## TODO: Some inconsistencies between the selection processes.
## Some return the keys from the options, some the values?
+def get_password(prompt="Enter a password: "):
+ while (passwd := getpass.getpass(prompt)):
+ passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
+ if passwd != passwd_verification:
+ log(' * Passwords did not match * ', bg='black', fg='red')
+ continue
+ return passwd
+ return None
+
+def ask_for_superuser_account(prompt='Create a required super-user with sudo privileges: ', forced=False):
+ while 1:
+ new_user = input(prompt).strip(' ')
+
+ if not new_user and forced:
+ # TODO: make this text more generic?
+ # It's only used to create the first sudo user when root is disabled in guided.py
+ log(' * Since root is disabled, you need to create a least one (super) user!', bg='black', fg='red')
+ continue
+ elif not new_user and not forced:
+ raise UserError("No superuser was created.")
+
+ password = get_password(prompt=f'Password for user {new_user}: ')
+ return {new_user: password}
+
+def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
+ users = {}
+ super_users = {}
+
+ while 1:
+ new_user = input(prompt).strip(' ')
+ if not new_user:
+ break
+ password = get_password(prompt=f'Password for user {new_user}: ')
+
+ if input("Should this user be a sudo (super) user (y/N): ").strip(' ').lower() in ('y', 'yes'):
+ super_users[new_user] = password
+ else:
+ users[new_user] = password
+
+ return users, super_users
+
+def ask_to_configure_network():
+ # Optionally configure one network interface.
+ #while 1:
+ # {MAC: Ifname}
+ interfaces = {'ISO-CONFIG' : 'Copy ISO network configuration to installation', **list_interfaces()}
+
+ nic = generic_select(interfaces.values(), "Select one network interface to configure (leave blank to skip): ")
+ if nic and nic != 'Copy ISO network configuration to installation':
+ mode = generic_select(['DHCP (auto detect)', 'IP (static)'], f"Select which mode to configure for {nic}: ")
+ if mode == 'IP (static)':
+ while 1:
+ ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip()
+ if ip:
+ break
+ else:
+ log(
+ "You need to enter a valid IP in IP-config mode.",
+ level=LOG_LEVELS.Warning,
+ bg='black',
+ fg='red'
+ )
+
+ if not len(gateway := input('Enter your gateway (router) IP address or leave blank for none: ').strip()):
+ gateway = None
+
+ dns = None
+ if len(dns_input := input('Enter your DNS servers (space separated, blank for none): ').strip()):
+ dns = dns_input.split(' ')
+
+ return {'nic': nic, 'dhcp': False, 'ip': ip, 'gateway' : gateway, 'dns' : dns}
+ else:
+ return {'nic': nic}
+ elif nic:
+ return nic
+
+ return None
+
+def ask_for_disk_layout():
+ options = {
+ 'keep-existing' : 'Keep existing partition layout and select which ones to use where.',
+ 'format-all' : 'Format entire drive and setup a basic partition scheme.',
+ 'abort' : 'Abort the installation.'
+ }
+
+ value = generic_select(options.values(), "Found partitions on the selected drive, (select by number) what you want to do: ")
+ return next((key for key, val in options.items() if val == value), None)
+
+def ask_for_main_filesystem_format():
+ options = {
+ 'btrfs' : 'btrfs',
+ 'ext4' : 'ext4',
+ 'xfs' : 'xfs',
+ 'f2fs' : 'f2fs',
+ 'vfat' : 'vfat'
+ }
+
+ value = generic_select(options.values(), "Select your main partitions filesystem by number or free-text: ")
+ return next((key for key, val in options.items() if val == value), None)
+
def generic_select(options, input_text="Select one of the above by index or absolute value: ", sort=True):
"""
A generic select function that does not output anything
@@ -93,23 +197,7 @@ def select_profile(options):
else:
RequirementError("Selected profile does not exist.")
- profile = Profile(None, selected_profile)
- with open(profile.path, 'r') as source:
- source_data = source.read()
-
- # Some crude safety checks, make sure the imported profile has
- # a __name__ check and if so, check if it's got a _prep_function()
- # we can call to ask for more user input.
- #
- # If the requirements are met, import with .py in the namespace to not
- # trigger a traditional:
- # if __name__ == 'moduleName'
- if '__name__' in source_data and '_prep_function' in source_data:
- with profile.load_instructions(namespace=f"{selected_profile}.py") as imported:
- if hasattr(imported, '_prep_function'):
- return profile, imported
-
- return selected_profile
+ return Profile(None, selected_profile)
raise RequirementError("Selecting profiles require a least one profile to be given as an option.")