Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk.py118
-rw-r--r--archinstall/lib/user_interaction.py280
2 files changed, 394 insertions, 4 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index 8f67111a..ac59600d 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -31,10 +31,10 @@ class BlockDevice:
self.info = info
self.keep_partitions = True
self.part_cache = OrderedDict()
+
# TODO: Currently disk encryption is a BIT misleading.
# It's actually partition-encryption, but for future-proofing this
# I'm placing the encryption password on a BlockDevice level.
- self.encryption_password = None
def __repr__(self, *args, **kwargs):
return f"BlockDevice({self.device})"
@@ -48,6 +48,9 @@ class BlockDevice:
raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key]
+ def __len__(self):
+ return len(self.partitions)
+
def json(self):
"""
json() has precedence over __dump__, so this is a way
@@ -61,12 +64,22 @@ class BlockDevice:
def __dump__(self):
return {
- 'path': self.path,
- 'info': self.info,
- 'partition_cache': self.part_cache
+ self.path : {
+ 'partuuid' : self.uuid,
+ 'wipe' : self.info.get('wipe', None),
+ 'partitions' : [part.__dump__() for part in self.partitions.values()]
+ }
}
@property
+ def partition_type(self):
+ output = b"".join(sys_command(f"lsblk --json -o+PTTYPE {self.path}"))
+ output = json.loads(output.decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return device['pttype']
+
+ @property
def device(self):
"""
Returns the actual device-endpoint of the BlockDevice.
@@ -144,6 +157,16 @@ class BlockDevice:
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
return partition.get('uuid', None)
+ @property
+ def size(self):
+ output = b"".join(sys_command(f"lsblk --json -o+SIZE {self.path}"))
+ output = json.loads(output.decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ assert device['size'][-1] == 'G' # Make sure we're counting in Gigabytes, otherwise the next logic fails.
+
+ return float(device['size'][:-1])
+
def has_partitions(self):
return len(self.partitions)
@@ -210,6 +233,82 @@ class Partition:
else:
return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})'
+ def __dump__(self):
+ return {
+ 'type' : 'primary',
+ 'PARTUUID' : self.uuid,
+ 'wipe' : self.allow_formatting,
+ 'boot' : self.boot,
+ 'ESP' : self.boot,
+ 'mountpoint' : self.target_mountpoint,
+ 'encrypted' : self._encrypted,
+ 'start' : self.start,
+ 'size' : self.end,
+ 'filesystem' : {
+ 'format' : get_filesystem_type(self.path)
+ }
+ }
+
+ @property
+ def sector_size(self):
+ output = b"".join(sys_command(f"lsblk --json -o+LOG-SEC {self.path}"))
+ output = json.loads(output.decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return device.get('log-sec', None)
+
+ @property
+ def start(self):
+ output = b"".join(sys_command(f"sfdisk --json {self.block_device.path}"))
+ output = json.loads(output.decode('UTF-8'))
+
+ for partition in output.get('partitiontable', {}).get('partitions', []):
+ if partition['node'] == self.path:
+ return partition['start']# * self.sector_size
+
+ @property
+ def end(self):
+ # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
+ output = b"".join(sys_command(f"sfdisk --json {self.block_device.path}"))
+ output = json.loads(output.decode('UTF-8'))
+
+ for partition in output.get('partitiontable', {}).get('partitions', []):
+ if partition['node'] == self.path:
+ return partition['size']# * self.sector_size
+
+ @property
+ def boot(self):
+ output = b"".join(sys_command(f"sfdisk --json {self.block_device.path}"))
+ output = json.loads(output.decode('UTF-8'))
+
+ # Get the bootable flag from the sfdisk output:
+ # {
+ # "partitiontable": {
+ # "label":"dos",
+ # "id":"0xd202c10a",
+ # "device":"/dev/loop0",
+ # "unit":"sectors",
+ # "sectorsize":512,
+ # "partitions": [
+ # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true}
+ # ]
+ # }
+ # }
+
+ for partition in output.get('partitiontable', {}).get('partitions', []):
+ if partition['node'] == self.path:
+ return partition.get('bootable', False)
+
+ return False
+
+ @property
+ def partition_type(self):
+ output = b"".join(sys_command(f"lsblk --json -o+PTTYPE {self.path}"))
+ output = json.loads(output.decode('UTF-8'))
+
+ for device in output['blockdevices']:
+ return device['pttype']
+
@property
def uuid(self) -> Optional[str]:
"""
@@ -663,3 +762,14 @@ def disk_layouts():
except SysCallError as err:
log(f"Could not return disk layouts: {err}")
return None
+
+
+def encrypted_partitions(blockdevices :dict) -> bool:
+ for partition in blockdevices.values():
+ if partition.get('encrypted', False):
+ yield partition
+
+def find_partition_by_mountpoint(partitions, relative_mountpoint :str):
+ for partition in partitions:
+ if partition.get('mountpoint', None) == relative_mountpoint:
+ return partition \ No newline at end of file
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 50c62aa9..d3548e6b 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -188,8 +188,24 @@ def generic_multi_select(options, text="Select one or more of the options above
except RequirementError as e:
log(f" * {e} * ", fg='red')
+ sys.stdout.write('\n')
+ sys.stdout.flush()
return selected_options
+def select_encrypted_partitions(blockdevices :dict) -> dict:
+ print(blockdevices[0])
+
+ if len(blockdevices) == 1:
+ if len(blockdevices[0]['partitions']) == 2:
+ root = find_partition_by_mountpoint(blockdevices[0]['partitions'], '/')
+ blockdevices[0]['partitions'][root]['encrypted'] = True
+ return True
+
+ options = []
+ for partition in blockdevices.values():
+ options.append({key: val for key, val in partition.items() if val})
+
+ print(generic_multi_select(options, f"Choose which partitions to encrypt (leave blank when done): "))
class MiniCurses:
def __init__(self, width, height):
@@ -535,6 +551,270 @@ def generic_select(options, input_text="Select one of the above by index or abso
return selected_option
+def select_partition_layout(block_device):
+ return {
+ "/dev/sda": { # Block Device level
+ "wipe": False, # Safety flags
+ "partitions" : [ # Affected / New partitions
+ {
+ "PARTUUID" : "654bb317-1b73-4339-9a00-7222792f4ba9", # If existing partition
+ "wipe" : False, # Safety flags
+ "boot" : True, # Safety flags / new flags
+ "ESP" : True, # Safety flags / new flags
+ "mountpoint" : "/mnt/boot"
+ }
+ ]
+ },
+ "/dev/sdb" : {
+ "wipe" : True,
+ "partitions" : [
+ {
+ # No PARTUUID required here since it's a new partition
+ "type" : "primary", # parted options
+ "size" : "100%",
+ "filesystem" : {
+ "encrypted" : True, # TODO: Not sure about this here
+ "format": "btrfs", # mkfs options
+ },
+ "mountpoint" : "/mnt"
+ }
+ ]
+ }
+ }
+
+def valid_fs_type(fstype :str) -> bool:
+ # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
+
+ return fstype in [
+ "ext2",
+ "fat16", "fat32",
+ "hfs", "hfs+", "hfsx",
+ "linux-swap",
+ "NTFS",
+ "reiserfs",
+ "ufs",
+ "btrfs",
+ ]
+
+def valid_parted_position(pos :str):
+ if not len(pos):
+ return False
+
+ if pos.isdigit():
+ return True
+
+ if pos[-1] == '%' and pos[:-1].isdigit():
+ return True
+
+ if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].isdigit():
+ return True
+
+ return False
+
+def partition_overlap(partitions :list, start :str, end :str) -> bool:
+ # TODO: Implement sanity check
+ return False
+
+def get_default_partition_layout(block_devices):
+ if len(block_devices) == 1:
+ return {
+ block_devices[0] : [
+ { # Boot
+ "type" : "primary",
+ "start" : "0MiB",
+ "size" : "513MiB",
+ "boot" : True,
+ "mountpoint" : "/boot",
+ "filesystem" : {
+ "format" : "fat32"
+ }
+ },
+ { # Root
+ "type" : "primary",
+ "start" : "513MiB",
+ "encrypted" : True,
+ "size" : f"{max(block_devices[0].size*0.2, 20)}GiB",
+ "mountpoint" : "",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ },
+ { # Home
+ "type" : "primary",
+ "encrypted" : True,
+ "start" : f"{max(block_devices[0].size*0.2, 20)}GiB",
+ "size" : "100%",
+ "mountpoint" : "/home",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ }
+ ]
+ }
+
+def wipe_and_create_partitions(block_device):
+ if hasUEFI():
+ partition_type = 'gpt'
+ else:
+ partition_type = 'msdos'
+
+ partitions_result = [] # Test code: [part.__dump__() for part in block_device.partitions.values()]
+ suggested_layout = [
+ { # Boot
+ "type" : "primary",
+ "start" : "0MiB",
+ "size" : "513MiB",
+ "boot" : True,
+ "mountpoint" : "/boot",
+ "filesystem" : {
+ "format" : "fat32"
+ }
+ },
+ { # Root
+ "type" : "primary",
+ "start" : "513MiB",
+ "encrypted" : True,
+ "size" : f"{max(block_device.size*0.2, 20)}GiB",
+ "mountpoint" : "",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ },
+ { # Home
+ "type" : "primary",
+ "encrypted" : True,
+ "start" : f"{max(block_device.size*0.2, 20)}GiB",
+ "size" : "100%",
+ "mountpoint" : "/home",
+ "filesystem" : {
+ "format" : "btrfs"
+ }
+ }
+ ]
+ # TODO: Squeeze in BTRFS subvolumes here
+
+ while True:
+ modes = [
+ "Create new partition",
+ "Suggest partition layout",
+ "Delete partition" if len(partitions_result) else "",
+ "Assign mount-point for partition" if len(partitions_result) else "",
+ "Mark/Unmark a partition as encrypted" if len(partitions_result) else "",
+ "Mark/Unmark a partition as bootable (automatic for /boot)" if len(partitions_result) else ""
+ ]
+
+ # Print current partition layout:
+ if len(partitions_result):
+ print('Current partition layout:')
+ for partition in partitions_result:
+ print({key: val for key, val in partition.items() if val})
+ print()
+
+ task = generic_select(modes,
+ input_text=f"Select what to do with {block_device} (leave blank when done): ")
+
+ if task == 'Create new partition':
+ if partition_type == 'gpt':
+ # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
+ # https://www.gnu.org/software/parted/manual/html_node/mklabel.html
+ name = input("Enter a desired name for the partition: ").strip()
+ fstype = input("Enter a desired filesystem type for the partition: ").strip()
+ start = input("Enter the start sector of the partition (percentage or block number, ex: 0%): ").strip()
+ end = input("Enter the end sector of the partition (percentage or block number, ex: 100%): ").strip()
+
+ if valid_parted_position(start) and valid_parted_position(end) and valid_fs_type(fstype):
+ if partition_overlap(partitions_result, start, end):
+ log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", fg="red")
+ continue
+
+ partitions_result.append({
+ "type" : "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject
+ "start" : start,
+ "size" : end,
+ "mountpoint" : None,
+ "filesystem" : {
+ "format" : fstype
+ }
+ })
+ else:
+ log(f"Invalid start, end or fstype for this partition. Ignoring this partition creation.", fg="red")
+ continue
+ elif task == "Suggest partition layout":
+ if len(partitions_result):
+ if input(f"{block_device} contains queued partitions, this will remove those, are you sure? y/N: ").strip().lower() in ('', 'n'):
+ continue
+
+ partitions_result = [*suggested_layout]
+ elif task is None:
+ return partitions_result
+ else:
+ for index, partition in enumerate(partitions_result):
+ print(f"{index}: Start: {partition['start']}, End: {partition['size']} ({partition['filesystem']['format']}{', mounting at: '+partition['mountpoint'] if partition['mountpoint'] else ''})")
+
+ if task == "Delete partition":
+ if (partition := generic_select(partitions_result, 'Select which partition to delete: ', options_output=False)):
+ del(partitions_result[partitions_result.index(partition)])
+ elif task == "Assign mount-point for partition":
+ if (partition := generic_select(partitions_result, 'Select which partition to mount where: ', options_output=False)):
+ print(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')
+ mountpoint = input('Select where to mount partition (leave blank to remove mountpoint): ').strip()
+
+ if len(mountpoint):
+ partitions_result[partitions_result.index(partition)]['mountpoint'] = mountpoint
+ if mountpoint == '/boot':
+ log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow")
+ partitions_result[partitions_result.index(partition)]['boot'] = True
+ else:
+ del(partitions_result[partitions_result.index(partition)]['mountpoint'])
+
+ elif task == "Mark/Unmark a partition as encrypted":
+ if (partition := generic_select(partitions_result, 'Select which partition to mark as encrypted: ', options_output=False)):
+ # Negate the current encryption marking
+ partitions_result[partitions_result.index(partition)]['encrypted'] = not partitions_result[partitions_result.index(partition)].get('encrypted', False)
+
+ elif task == "Mark/Unmark a partition as bootable (automatic for /boot)":
+ if (partition := generic_select(partitions_result, 'Select which partition to mark as bootable: ', options_output=False)):
+ partitions_result[partitions_result.index(partition)]['boot'] = not partitions_result[partitions_result.index(partition)].get('boot', False)
+
+ return partitions_result
+
+def select_individual_blockdevice_usage(block_devices :list):
+ result = {}
+
+ for device in block_devices:
+ log(f'Select what to do with {device}', fg="yellow")
+ modes = [
+ "Wipe and create new partitions",
+ "Re-use partitions"
+ ]
+
+ device_mode = generic_select(modes)
+
+ if device_mode == "Re-use partitions":
+ layout = select_partition_layout(device)
+ elif device_mode == "Wipe and create new partitions":
+ layout = wipe_and_create_partitions(device)
+ else:
+ continue
+
+ result[device] = layout
+
+ return result
+
+
+def select_disk_layout(block_devices :list):
+ modes = [
+ "Wipe all selected drives and use a best-effort default partition layout",
+ "Select what to do with each individual drive (followed by partition usage)"
+ ]
+
+ mode = generic_select(modes, input_text=f"Select what you wish to do with the selected block devices: ")
+
+ if mode == 'Wipe all selected drives and use a best-effort default partition layout':
+ return get_default_partition_layout(block_devices)
+ else:
+ return select_individual_blockdevice_usage(block_devices)
+
def select_disk(dict_o_disks):
"""