Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/general.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/general.py')
-rw-r--r--archinstall/lib/general.py55
1 files changed, 26 insertions, 29 deletions
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 57f13288..997b7d67 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -19,9 +19,15 @@ import pathlib
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
+from .exceptions import RequirementError, SysCallError
+from .output import log
+from .storage import storage
+
+
if TYPE_CHECKING:
from .installer import Installer
+
if sys.platform == 'linux':
from select import epoll, EPOLLIN, EPOLLHUP
else:
@@ -53,30 +59,15 @@ else:
except OSError:
return []
-from .exceptions import RequirementError, SysCallError
-from .output import log
-from .storage import storage
def gen_uid(entropy_length :int = 256) -> str:
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
+
def generate_password(length :int = 64) -> str:
haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace
return ''.join(secrets.choice(haystack) for i in range(length))
-def multisplit(s :str, splitters :List[str]) -> str:
- s = [s, ]
- for key in splitters:
- ns = []
- for obj in s:
- x = obj.split(key)
- for index, part in enumerate(x):
- if len(part):
- ns.append(part)
- if index < len(x) - 1:
- ns.append(key)
- s = ns
- return s
def locate_binary(name :str) -> str:
for PATH in os.environ['PATH'].split(':'):
@@ -88,20 +79,20 @@ def locate_binary(name :str) -> str:
raise RequirementError(f"Binary {name} does not exist.")
-def clear_vt100_escape_codes(data :Union[bytes, str]):
+
+def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]:
# https://stackoverflow.com/a/43627833/929999
if type(data) == bytes:
- vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8')
- else:
+ byte_vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8')
+ data = re.sub(byte_vt100_escape_regex, b'', data)
+ elif type(data) == str:
vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
-
- for match in re.findall(vt100_escape_regex, data, re.IGNORECASE):
- data = data.replace(match, '' if type(data) == str else b'')
+ data = re.sub(vt100_escape_regex, '', data)
+ else:
+ raise ValueError(f'Unsupported data type: {type(data)}')
return data
-def json_dumps(*args :str, **kwargs :str) -> str:
- return json.dumps(*args, **{**kwargs, 'cls': JSON})
class JsonEncoder:
@staticmethod
@@ -245,10 +236,12 @@ class SysCommandWorker:
def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]:
for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'):
if line:
+ escaped_line: bytes = line
+
if self.remove_vt100_escape_codes_from_lines:
- line = clear_vt100_escape_codes(line)
+ escaped_line = clear_vt100_escape_codes(line) # type: ignore
- yield line + b'\n'
+ yield escaped_line + b'\n'
self._trace_log_pos = self._trace_log.rfind(b'\n')
@@ -279,7 +272,11 @@ class SysCommandWorker:
log(args[1], level=logging.DEBUG, fg='red')
if self.exit_code != 0:
- raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code, worker=self)
+ raise SysCallError(
+ f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self._trace_log[-500:])}",
+ self.exit_code,
+ worker=self
+ )
def is_alive(self) -> bool:
self.poll()
@@ -328,7 +325,7 @@ class SysCommandWorker:
change_perm = True
with peak_logfile.open("a") as peek_output_log:
- peek_output_log.write(output)
+ peek_output_log.write(str(output))
if change_perm:
os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
@@ -497,7 +494,7 @@ class SysCommand:
clears any printed output if ``.peek_output=True``.
"""
if self.session:
- return self.session
+ return True
with SysCommandWorker(
self.cmd,