Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/general.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/general.py')
-rw-r--r--archinstall/lib/general.py88
1 files changed, 20 insertions, 68 deletions
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 997b7d67..777ee90e 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -1,8 +1,6 @@
from __future__ import annotations
-import hashlib
import json
-import logging
import os
import secrets
import shlex
@@ -18,9 +16,10 @@ import urllib.error
import pathlib
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
+from select import epoll, EPOLLIN, EPOLLHUP
from .exceptions import RequirementError, SysCallError
-from .output import log
+from .output import debug, error, info
from .storage import storage
@@ -28,42 +27,6 @@ if TYPE_CHECKING:
from .installer import Installer
-if sys.platform == 'linux':
- from select import epoll, EPOLLIN, EPOLLHUP
-else:
- import select
- EPOLLIN = 0
- EPOLLHUP = 0
-
- class epoll():
- """ #!if windows
- Create a epoll() implementation that simulates the epoll() behavior.
- This so that the rest of the code doesn't need to worry weither we're using select() or epoll().
- """
- def __init__(self) -> None:
- self.sockets: Dict[str, Any] = {}
- self.monitoring: Dict[int, Any] = {}
-
- def unregister(self, fileno :int, *args :List[Any], **kwargs :Dict[str, Any]) -> None:
- try:
- del(self.monitoring[fileno]) # noqa: E275
- except:
- pass
-
- def register(self, fileno :int, *args :int, **kwargs :Dict[str, Any]) -> None:
- self.monitoring[fileno] = True
-
- def poll(self, timeout: float = 0.05, *args :str, **kwargs :Dict[str, Any]) -> List[Any]:
- try:
- return [[fileno, 1] for fileno in select.select(list(self.monitoring.keys()), [], [], timeout)[0]]
- except OSError:
- return []
-
-
-def gen_uid(entropy_length :int = 256) -> str:
- return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
-
-
def generate_password(length :int = 64) -> str:
haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace
return ''.join(secrets.choice(haystack) for i in range(length))
@@ -156,6 +119,7 @@ class JsonEncoder:
else:
return JsonEncoder._encode(obj)
+
class JSON(json.JSONEncoder, json.JSONDecoder):
"""
A safe JSON encoder that will omit private information in dicts (starting with !)
@@ -166,6 +130,7 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
def encode(self, obj :Any) -> Any:
return super(JSON, self).encode(self._encode(obj))
+
class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder):
"""
UNSAFE_JSON will call/encode and keep private information in dicts (starting with !)
@@ -269,7 +234,7 @@ class SysCommandWorker:
sys.stdout.flush()
if len(args) >= 2 and args[1]:
- log(args[1], level=logging.DEBUG, fg='red')
+ debug(args[1])
if self.exit_code != 0:
raise SysCallError(
@@ -350,7 +315,7 @@ class SysCommandWorker:
self.ended = time.time()
break
- if self.ended or (got_output is False and pid_exists(self.pid) is False):
+ if self.ended or (got_output is False and _pid_exists(self.pid) is False):
self.ended = time.time()
try:
wait_status = os.waitpid(self.pid, 0)[1]
@@ -396,15 +361,15 @@ class SysCommandWorker:
pass
except Exception as e:
exception_type = type(e).__name__
- log(f"Unexpected {exception_type} occurred in {self.cmd}: {e}", level=logging.ERROR)
+ error(f"Unexpected {exception_type} occurred in {self.cmd}: {e}")
raise e
os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars})
if storage['arguments'].get('debug'):
- log(f"Executing: {self.cmd}", level=logging.DEBUG)
+ debug(f"Executing: {self.cmd}")
except FileNotFoundError:
- log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red")
+ error(f"{self.cmd[0]} does not exist.")
self.exit_code = 1
return False
else:
@@ -455,7 +420,7 @@ class SysCommand:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
- log(args[1], level=logging.ERROR, fg='red')
+ error(args[1])
def __iter__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> Iterator[bytes]:
if self.session:
@@ -535,22 +500,7 @@ class SysCommand:
return None
-def prerequisite_check() -> bool:
- """
- This function is used as a safety check before
- continuing with an installation.
-
- Could be anything from checking that /boot is big enough
- to check if nvidia hardware exists when nvidia driver was chosen.
- """
-
- return True
-
-def reboot():
- SysCommand("/usr/bin/reboot")
-
-
-def pid_exists(pid: int) -> bool:
+def _pid_exists(pid: int) -> bool:
try:
return any(subprocess.check_output(['/usr/bin/ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip())
except subprocess.CalledProcessError:
@@ -559,7 +509,7 @@ def pid_exists(pid: int) -> bool:
def run_custom_user_commands(commands :List[str], installation :Installer) -> None:
for index, command in enumerate(commands):
- log(f'Executing custom command "{command}" ...', level=logging.INFO)
+ info(f'Executing custom command "{command}" ...')
with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script:
temp_script.write(command)
@@ -568,6 +518,7 @@ def run_custom_user_commands(commands :List[str], installation :Installer) -> No
os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")
+
def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool :
"""
Function to load a stream (file (as name) or valid JSON string into an existing dictionary
@@ -582,16 +533,16 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target
try:
with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
target.update(json.loads(response.read()))
- except urllib.error.HTTPError as error:
- log(f"Could not load {configuration_identifier} via {parsed_url} due to: {error}", level=logging.ERROR, fg="red")
+ except urllib.error.HTTPError as err:
+ error(f"Could not load {configuration_identifier} via {parsed_url} due to: {err}")
return False
else:
if pathlib.Path(stream).exists():
try:
with pathlib.Path(stream).open() as fh:
target.update(json.load(fh))
- except Exception as error:
- log(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {error}", level=logging.ERROR, fg="red")
+ except Exception as err:
+ error(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {err}")
return False
else:
# NOTE: This is a rudimentary check if what we're trying parse is a dict structure.
@@ -600,14 +551,15 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target
try:
target.update(json.loads(stream))
except Exception as e:
- log(f" {configuration_identifier} Contains an invalid JSON format : {e}",level=logging.ERROR, fg="red")
+ error(f"{configuration_identifier} Contains an invalid JSON format: {e}")
return False
else:
- log(f" {configuration_identifier} is neither a file nor is a JSON string:",level=logging.ERROR, fg="red")
+ error(f"{configuration_identifier} is neither a file nor is a JSON string")
return False
return True
+
def secret(x :str):
""" return * with len equal to to the input string """
return '*' * len(x)