From 2fcd8198b28744bad471b11d287a39ead267af67 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Fri, 5 Nov 2021 16:27:01 +0100 Subject: Cleaned up all flake8 issues/warnings. Did some code cleaning as well, mostly how we called things in guided.py but also some SysCommand calls --- archinstall/lib/disk/__init__.py | 2 +- archinstall/lib/disk/blockdevice.py | 14 ++++++++------ archinstall/lib/disk/btrfs.py | 13 +++++++------ archinstall/lib/disk/filesystem.py | 12 ++++++------ archinstall/lib/disk/helpers.py | 5 +++-- archinstall/lib/disk/partition.py | 11 +++++++---- archinstall/lib/disk/user_guides.py | 5 +++-- archinstall/lib/disk/validators.py | 2 +- archinstall/lib/general.py | 2 +- archinstall/lib/hardware.py | 4 ++++ archinstall/lib/installer.py | 35 +++++++++++++++++++++-------------- archinstall/lib/luks.py | 9 +++++++-- archinstall/lib/mirrors.py | 14 +++++++------- archinstall/lib/packages.py | 2 +- archinstall/lib/profiles.py | 1 + archinstall/lib/services.py | 3 ++- archinstall/lib/user_interaction.py | 32 +++++++++++++++----------------- 17 files changed, 95 insertions(+), 71 deletions(-) (limited to 'archinstall') diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py index 352d04b9..bb6eb815 100644 --- a/archinstall/lib/disk/__init__.py +++ b/archinstall/lib/disk/__init__.py @@ -4,4 +4,4 @@ from .blockdevice import BlockDevice from .filesystem import Filesystem, MBR, GPT from .partition import * from .user_guides import * -from .validators import * \ No newline at end of file +from .validators import * diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index d278fa2e..8f5c0a85 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,6 +1,8 @@ import os import json import logging +from .exceptions import DiskError +from .helpers import all_disks from ..output import log from ..general import SysCommand @@ -57,7 +59,7 @@ class BlockDevice: @property def partition_type(self): output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) - + for device in output['blockdevices']: return device['pttype'] @@ -164,21 +166,21 @@ class BlockDevice: @property def size(self): output = json.loads(SysCommand(f"lsblk --json -o+SIZE {self.path}").decode('UTF-8')) - + for device in output['blockdevices']: return self.convert_size_to_gb(device['size']) @property def bus_type(self): output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) - + for device in output['blockdevices']: return device['tran'] - + @property def spinning(self): output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) - + for device in output['blockdevices']: return device['rota'] is True @@ -220,4 +222,4 @@ class BlockDevice: def get_partition(self, uuid): for partition in self: if partition.uuid == uuid: - return partition \ No newline at end of file + return partition diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py index 6fafab34..7ae4f6a6 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs.py @@ -1,4 +1,5 @@ -import pathlib, glob +import pathlib +import glob import logging from typing import Union from .helpers import get_mount_info @@ -27,9 +28,9 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], if not target.exists(): target.mkdir(parents=True) - if glob.glob(str(target/'*')) and force is False: + if glob.glob(str(target / '*')) and force is False: raise DiskError(f"Cannot mount subvolume to {target} because it contains data (non-empty folder target)") - + log(f"Mounting {target} as a subvolume", level=logging.INFO) # Mount the logical volume to the physical structure mount_information, mountpoint_device_real_path = get_mount_info(target, traverse=True, return_real_path=True) @@ -46,7 +47,7 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) @installation: archinstall.Installer instance @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot """ - + installation_mountpoint = installation.target if type(installation_mountpoint) == str: installation_mountpoint = pathlib.Path(installation_mountpoint) @@ -61,7 +62,7 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) if not target.parent.exists(): target.parent.mkdir(parents=True) - if glob.glob(str(target/'*')) and force is False: + if glob.glob(str(target / '*')): raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)") # Remove the target if it exists @@ -70,4 +71,4 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) log(f"Creating a subvolume on {target}", level=logging.INFO) if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: - raise DiskError(f"Could not create a subvolume at {target}: {cmd}") \ No newline at end of file + raise DiskError(f"Could not create a subvolume at {target}: {cmd}") diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index d8cc85f9..69593d08 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,8 +1,9 @@ import time import logging import json +from .exceptions import DiskError from .partition import Partition -from .blockdevice import BlockDevice +from .validators import valid_fs_type from ..general import SysCommand from ..output import log from ..storage import storage @@ -55,7 +56,7 @@ class Filesystem: def partuuid_to_index(self, uuid): output = json.loads(SysCommand(f"lsblk --json -o+PARTUUID {self.blockdevice.device}").decode('UTF-8')) - + for device in output['blockdevices']: for index, partition in enumerate(device['children']): if partition['partuuid'].lower() == uuid: @@ -101,7 +102,7 @@ class Filesystem: partition['password'] = get_password(f"Enter a encryption password for {partition['device_instance']}") partition['device_instance'].encrypt(password=partition['password']) - with luks2(partition['device_instance'], storage.get('ENC_IDENTIFIER', 'ai')+'loop', partition['password']) as unlocked_device: + with luks2(partition['device_instance'], storage.get('ENC_IDENTIFIER', 'ai') + 'loop', partition['password']) as unlocked_device: if not partition.get('format'): if storage['arguments'] == 'silent': raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}") @@ -113,7 +114,7 @@ class Filesystem: while True: partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip() if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False: - pint("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.") + print("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.") continue break @@ -170,7 +171,6 @@ class Filesystem: raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).") time.sleep(0.025) - # Todo: Find a better way to detect if the new UUID of the partition has showed up. # But this will address (among other issues) time.sleep(float(storage['arguments'].get('disk-sleep', 2.0))) # Let the kernel catch up with quick block devices (nvme for instance) @@ -190,4 +190,4 @@ class Filesystem: SysCommand(f'bash -c "umount {device}?"') except: pass - return self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0 \ No newline at end of file + return self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0 diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index 341b732f..8e044ce3 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -1,10 +1,11 @@ import re +import os import json import logging import pathlib from typing import Union from .blockdevice import BlockDevice -from ..exceptions import SysCallError +from ..exceptions import SysCallError, DiskError from ..general import SysCommand from ..output import log @@ -199,4 +200,4 @@ def find_partition_by_mountpoint(block_devices, relative_mountpoint :str): for device in block_devices: for partition in block_devices[device]['partitions']: if partition.get('mountpoint', None) == relative_mountpoint: - return partition \ No newline at end of file + return partition diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 40a83d6b..afe46db3 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -4,9 +4,12 @@ import time import logging import json import os +import hashlib from typing import Optional from .blockdevice import BlockDevice +from .exceptions import DiskError, SysCallError, UnknownFilesystemFormat from .helpers import get_mount_info, get_filesystem_type +from .storage import storage from ..output import log from ..general import SysCommand @@ -82,14 +85,14 @@ class Partition: @property def sector_size(self): output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.path}").decode('UTF-8')) - + for device in output['blockdevices']: return device.get('log-sec', None) @property def start(self): output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) - + for partition in output.get('partitiontable', {}).get('partitions', []): if partition['node'] == self.path: return partition['start']# * self.sector_size @@ -173,7 +176,7 @@ class Partition: from .luks import luks2 try: - with luks2(self, storage.get('ENC_IDENTIFIER', 'ai')+'loop', password, auto_unmount=True) as unlocked_device: + with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device: return unlocked_device.filesystem except SysCallError: return None @@ -346,4 +349,4 @@ class Partition: pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code except UnknownFilesystemFormat as err: raise err - return True \ No newline at end of file + return True diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 6f8a1edb..e235dfc9 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -1,4 +1,5 @@ import logging +from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to from ..output import log def suggest_single_disk_layout(block_device, default_filesystem=None): @@ -55,7 +56,7 @@ def suggest_single_disk_layout(block_device, default_filesystem=None): } } else: - pass #... implement a guided setup + pass # ... implement a guided setup elif block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: # If we don't want to use subvolumes, @@ -146,4 +147,4 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None): } }) - return layout \ No newline at end of file + return layout diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index 0c9f5a74..e0ab6a86 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -39,4 +39,4 @@ def valid_fs_type(fstype :str) -> bool: "reiserfs", "udf", # "ufs", not included in `man parted` "xfs", # `man parted` allows this - ] \ No newline at end of file + ] diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index 4cbe9692..887a60d3 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -14,7 +14,7 @@ except: import select EPOLLIN = 0 EPOLLHUP = 0 - + class epoll(): """ #!if windows Create a epoll() implementation that simulates the epoll() behavior. diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py index bbfb06a9..cf99a530 100644 --- a/archinstall/lib/hardware.py +++ b/archinstall/lib/hardware.py @@ -99,9 +99,13 @@ def has_wifi() -> bool: def has_cpu_vendor(vendor_id: str) -> bool: return any(cpu.get("vendor_id") == vendor_id for cpu in cpuinfo()) + has_amd_cpu = partial(has_cpu_vendor, "AuthenticAMD") + + has_intel_cpu = partial(has_cpu_vendor, "GenuineIntel") + def has_uefi() -> bool: return os.path.isdir('/sys/firmware/efi') diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 0bdddb2e..47a26d6d 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -1,14 +1,23 @@ import time -from .disk import * -from .hardware import * +import logging +import os +import shutil +import pathlib +import subprocess +import glob +from .disk import get_partitions_in_use, Partition, find_partition +from .general import SysCommand +from .hardware import has_uefi, is_vm, cpu_vendor from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout from .disk.helpers import get_mount_info -from .mirrors import * +from .mirrors import use_mirrors from .plugins import plugins from .storage import storage -from .user_interaction import * +# from .user_interaction import * +from .output import log +from .profiles import Profile from .disk.btrfs import create_subvolume, mount_subvolume -from .exceptions import DiskError, ServiceException +from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError # Any package that the Installer() is responsible for (optional and the default ones) __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"] @@ -139,7 +148,7 @@ class Installer: for mountpoint in sorted(mountpoints.keys()): if mountpoints[mountpoint]['encrypted']: - loopdev = storage.get('ENC_IDENTIFIER', 'ai')+'loop' + loopdev = storage.get('ENC_IDENTIFIER', 'ai') + 'loop' password = mountpoints[mountpoint]['password'] with luks2(mountpoints[mountpoint]['device_instance'], loopdev, password, auto_unmount=False) as unlocked_device: unlocked_device.mount(f"{self.target}{mountpoint}") @@ -198,7 +207,7 @@ class Installer: self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: - fstab_fh.write(SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode()) + fstab_fh.write((fstab := SysCommand(f'/usr/bin/genfstab {flags} {self.target}')).decode()) if not os.path.isfile(f'{self.target}/etc/fstab'): raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{fstab}') @@ -554,7 +563,7 @@ class Installer: self.helper_flags['bootloader'] = True return True else: - boot_partition = filesystem.find_partition(mountpoint=f"{self.target}/boot") + boot_partition = find_partition(mountpoint=f"{self.target}/boot") SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc --recheck {boot_partition.path}') SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg') self.helper_flags['bootloader'] = True @@ -595,14 +604,14 @@ class Installer: if not handled_by_plugin: self.log(f'Creating user {user}', level=logging.INFO) - o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')) + SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') if password: self.user_set_pw(user, password) if groups: for group in groups: - o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}')) + SysCommand(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}') if sudo and self.enable_sudo(user): self.helper_flags['user'] = True @@ -614,14 +623,12 @@ class Installer: # This means the root account isn't locked/disabled with * in /etc/passwd self.helper_flags['user'] = True - o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")) - pass + SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"") def user_set_shell(self, user, shell): self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) - o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")) - pass + SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") def set_keyboard_language(self, language: str) -> bool: if len(language.strip()): diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 781bed43..d10058ef 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -1,9 +1,14 @@ +import json +import logging +import os import pathlib +import shlex +import time from .disk import Partition -from .general import * +from .general import SysCommand from .output import log - +from .exceptions import SysCallError, DiskError class luks2: def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs): diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index ed34b5d5..5fad6cb6 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,8 +1,9 @@ +import logging import urllib.error import urllib.request from typing import Union, Mapping, Iterable -from .general import * +from .general import SysCommand from .output import log def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: @@ -26,7 +27,7 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: """ comments_and_whitespaces = b"" - categories = {key: [] for key in sort_order+["Unknown"]} + categories = {key: [] for key in sort_order + ["Unknown"]} for line in raw_data.split(b"\n"): if line[0:2] in (b'##', b''): comments_and_whitespaces += line + b'\n' @@ -35,16 +36,15 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: opening, url = opening.strip(), url.strip() if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories: categories[category].append(comments_and_whitespaces) - categories[category].append(opening+b' = '+url+b'\n') + categories[category].append(opening + b' = ' + url + b'\n') else: categories["Unknown"].append(comments_and_whitespaces) - categories["Unknown"].append(opening+b' = '+url+b'\n') + categories["Unknown"].append(opening + b' = ' + url + b'\n') comments_and_whitespaces = b"" - new_raw_data = b'' - for category in sort_order+["Unknown"]: + for category in sort_order + ["Unknown"]: for line in categories[category]: new_raw_data += line @@ -115,7 +115,7 @@ def insert_mirrors(mirrors, *args, **kwargs): def use_mirrors( regions: Mapping[str, Iterable[str]], - destination: str ='/etc/pacman.d/mirrorlist' + destination: str = '/etc/pacman.d/mirrorlist' ) -> None: log(f'A new package mirror-list has been created: {destination}', level=logging.INFO) with open(destination, 'w') as mirrorlist: diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py index 0178d257..ffc44cbe 100644 --- a/archinstall/lib/packages.py +++ b/archinstall/lib/packages.py @@ -4,7 +4,7 @@ import urllib.error import urllib.parse import urllib.request -from .exceptions import * +from .exceptions import RequirementError BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}' BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/' diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py index cd86ba1f..7d5373c5 100644 --- a/archinstall/lib/profiles.py +++ b/archinstall/lib/profiles.py @@ -13,6 +13,7 @@ from typing import Optional from .general import multisplit from .networking import list_interfaces from .storage import storage +from .exceptions import ProfileNotFound def grab_url_data(path): diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py index 6f8f2a87..d295bdbb 100644 --- a/archinstall/lib/services.py +++ b/archinstall/lib/services.py @@ -1,4 +1,5 @@ -from .general import * +import os +from .general import SysCommand def service_state(service_name: str): diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index ba6259b1..6b142288 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -10,14 +10,13 @@ import sys import time from .disk import BlockDevice, valid_fs_type, find_partition_by_mountpoint, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position -from .exceptions import * -from .general import SysCommand +from .exceptions import RequirementError, UserError, DiskError from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout from .networking import list_interfaces from .output import log from .profiles import Profile, list_profiles -from .storage import * +from .storage import storage # TODO: Some inconsistencies between the selection processes. # Some return the keys from the options, some the values? @@ -201,11 +200,11 @@ def select_encrypted_partitions(block_devices :dict, password :str) -> dict: return block_devices # TODO: Next version perhaps we can support multiple encrypted partitions - #options = [] - #for partition in block_devices.values(): - # options.append({key: val for key, val in partition.items() if val}) + # options = [] + # for partition in block_devices.values(): + # options.append({key: val for key, val in partition.items() if val}) - #print(generic_multi_select(options, f"Choose which partitions to encrypt (leave blank when done): ")) + # print(generic_multi_select(options, f"Choose which partitions to encrypt (leave blank when done): ")) class MiniCurses: def __init__(self, width, height): @@ -567,10 +566,10 @@ def get_default_partition_layout(block_devices): # TODO: Implement sane generic layout for 2+ drives def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: - if has_uefi(): - partition_type = 'gpt' - else: - partition_type = 'msdos' + # if has_uefi(): + # partition_type = 'gpt' + # else: + # partition_type = 'msdos' # log(f"Selecting which partitions to re-use on {block_device}...", fg="yellow", level=logging.INFO) # partitions = generic_multi_select(block_device.partitions.values(), "Select which partitions to re-use (the rest will be left alone): ", sort=True) @@ -600,7 +599,6 @@ def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: # return struct - mountpoints = {} block_device_struct = { "partitions" : [partition.__dump__() for partition in block_device.partitions.values()] } @@ -634,10 +632,10 @@ def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: break if task == 'Create a new partition': - if partition_type == 'gpt': - # https://www.gnu.org/software/parted/manual/html_node/mkpart.html - # https://www.gnu.org/software/parted/manual/html_node/mklabel.html - name = input("Enter a desired name for the partition: ").strip() + # if partition_type == 'gpt': + # # https://www.gnu.org/software/parted/manual/html_node/mkpart.html + # # https://www.gnu.org/software/parted/manual/html_node/mklabel.html + # name = input("Enter a desired name for the partition: ").strip() fstype = input("Enter a desired filesystem type for the partition: ").strip() @@ -674,7 +672,7 @@ def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: if input(f"{block_device} contains queued partitions, this will remove those, are you sure? y/N: ").strip().lower() in ('', 'n'): continue - block_device_struct.update( suggest_single_disk_layout(block_device)[block_device.path] ) + block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path]) elif task is None: return block_device_struct else: -- cgit v1.2.3-54-g00ecf