From 89cefb9a1c7d4c4968e7d8645149078e601c9d1c Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Fri, 12 May 2023 02:30:09 +1000 Subject: Cleanup imports and unused code (#1801) * Cleanup imports and unused code * Update build check * Keep deprecation exception * Simplify logging --------- Co-authored-by: Daniel Girtler --- archinstall/lib/general.py | 88 +++++++++++----------------------------------- 1 file changed, 20 insertions(+), 68 deletions(-) (limited to 'archinstall/lib/general.py') diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index 997b7d67..777ee90e 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -1,8 +1,6 @@ from __future__ import annotations -import hashlib import json -import logging import os import secrets import shlex @@ -18,9 +16,10 @@ import urllib.error import pathlib from datetime import datetime, date from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING +from select import epoll, EPOLLIN, EPOLLHUP from .exceptions import RequirementError, SysCallError -from .output import log +from .output import debug, error, info from .storage import storage @@ -28,42 +27,6 @@ if TYPE_CHECKING: from .installer import Installer -if sys.platform == 'linux': - from select import epoll, EPOLLIN, EPOLLHUP -else: - import select - EPOLLIN = 0 - EPOLLHUP = 0 - - class epoll(): - """ #!if windows - Create a epoll() implementation that simulates the epoll() behavior. - This so that the rest of the code doesn't need to worry weither we're using select() or epoll(). - """ - def __init__(self) -> None: - self.sockets: Dict[str, Any] = {} - self.monitoring: Dict[int, Any] = {} - - def unregister(self, fileno :int, *args :List[Any], **kwargs :Dict[str, Any]) -> None: - try: - del(self.monitoring[fileno]) # noqa: E275 - except: - pass - - def register(self, fileno :int, *args :int, **kwargs :Dict[str, Any]) -> None: - self.monitoring[fileno] = True - - def poll(self, timeout: float = 0.05, *args :str, **kwargs :Dict[str, Any]) -> List[Any]: - try: - return [[fileno, 1] for fileno in select.select(list(self.monitoring.keys()), [], [], timeout)[0]] - except OSError: - return [] - - -def gen_uid(entropy_length :int = 256) -> str: - return hashlib.sha512(os.urandom(entropy_length)).hexdigest() - - def generate_password(length :int = 64) -> str: haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace return ''.join(secrets.choice(haystack) for i in range(length)) @@ -156,6 +119,7 @@ class JsonEncoder: else: return JsonEncoder._encode(obj) + class JSON(json.JSONEncoder, json.JSONDecoder): """ A safe JSON encoder that will omit private information in dicts (starting with !) @@ -166,6 +130,7 @@ class JSON(json.JSONEncoder, json.JSONDecoder): def encode(self, obj :Any) -> Any: return super(JSON, self).encode(self._encode(obj)) + class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder): """ UNSAFE_JSON will call/encode and keep private information in dicts (starting with !) @@ -269,7 +234,7 @@ class SysCommandWorker: sys.stdout.flush() if len(args) >= 2 and args[1]: - log(args[1], level=logging.DEBUG, fg='red') + debug(args[1]) if self.exit_code != 0: raise SysCallError( @@ -350,7 +315,7 @@ class SysCommandWorker: self.ended = time.time() break - if self.ended or (got_output is False and pid_exists(self.pid) is False): + if self.ended or (got_output is False and _pid_exists(self.pid) is False): self.ended = time.time() try: wait_status = os.waitpid(self.pid, 0)[1] @@ -396,15 +361,15 @@ class SysCommandWorker: pass except Exception as e: exception_type = type(e).__name__ - log(f"Unexpected {exception_type} occurred in {self.cmd}: {e}", level=logging.ERROR) + error(f"Unexpected {exception_type} occurred in {self.cmd}: {e}") raise e os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars}) if storage['arguments'].get('debug'): - log(f"Executing: {self.cmd}", level=logging.DEBUG) + debug(f"Executing: {self.cmd}") except FileNotFoundError: - log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red") + error(f"{self.cmd[0]} does not exist.") self.exit_code = 1 return False else: @@ -455,7 +420,7 @@ class SysCommand: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: - log(args[1], level=logging.ERROR, fg='red') + error(args[1]) def __iter__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> Iterator[bytes]: if self.session: @@ -535,22 +500,7 @@ class SysCommand: return None -def prerequisite_check() -> bool: - """ - This function is used as a safety check before - continuing with an installation. - - Could be anything from checking that /boot is big enough - to check if nvidia hardware exists when nvidia driver was chosen. - """ - - return True - -def reboot(): - SysCommand("/usr/bin/reboot") - - -def pid_exists(pid: int) -> bool: +def _pid_exists(pid: int) -> bool: try: return any(subprocess.check_output(['/usr/bin/ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip()) except subprocess.CalledProcessError: @@ -559,7 +509,7 @@ def pid_exists(pid: int) -> bool: def run_custom_user_commands(commands :List[str], installation :Installer) -> None: for index, command in enumerate(commands): - log(f'Executing custom command "{command}" ...', level=logging.INFO) + info(f'Executing custom command "{command}" ...') with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script: temp_script.write(command) @@ -568,6 +518,7 @@ def run_custom_user_commands(commands :List[str], installation :Installer) -> No os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh") + def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool : """ Function to load a stream (file (as name) or valid JSON string into an existing dictionary @@ -582,16 +533,16 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target try: with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: target.update(json.loads(response.read())) - except urllib.error.HTTPError as error: - log(f"Could not load {configuration_identifier} via {parsed_url} due to: {error}", level=logging.ERROR, fg="red") + except urllib.error.HTTPError as err: + error(f"Could not load {configuration_identifier} via {parsed_url} due to: {err}") return False else: if pathlib.Path(stream).exists(): try: with pathlib.Path(stream).open() as fh: target.update(json.load(fh)) - except Exception as error: - log(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {error}", level=logging.ERROR, fg="red") + except Exception as err: + error(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {err}") return False else: # NOTE: This is a rudimentary check if what we're trying parse is a dict structure. @@ -600,14 +551,15 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target try: target.update(json.loads(stream)) except Exception as e: - log(f" {configuration_identifier} Contains an invalid JSON format : {e}",level=logging.ERROR, fg="red") + error(f"{configuration_identifier} Contains an invalid JSON format: {e}") return False else: - log(f" {configuration_identifier} is neither a file nor is a JSON string:",level=logging.ERROR, fg="red") + error(f"{configuration_identifier} is neither a file nor is a JSON string") return False return True + def secret(x :str): """ return * with len equal to to the input string """ return '*' * len(x) -- cgit v1.2.3-70-g09d2