From c08520f9902aeb1b4ce22e1159060792081b0327 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Wed, 2 Feb 2022 14:22:52 +0100 Subject: SysCommand() to remove ANSII VT100 Esc codes & archlinux-keyring fix (#933) * Fixed SysCommandWorker() so that it removes ANSII VT100 escape codes. I also moved package.py into it's own folder, as that's something I want to expand on a lot, so package related stuff should go in there. I created a installed_package() function which gets information about the locally installed package. I changed so that find_packages() and find_package() returns a data-model instead for the package information. This should unify and make sure we detect issues down the line. * Working on structuring .version constructor that works with BaseModel * Added version contructors to VersionDef(). Also added __eq__ and __lt__ to LocalPackage() and PackageSearchResult(). * removed debug and added a TODO * Removed whitespace * Removed mirror-database function from myrepo --- archinstall/lib/general.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) (limited to 'archinstall/lib/general.py') diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index a3976234..a5444801 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -9,6 +9,7 @@ import subprocess import string import sys import time +import re from datetime import datetime, date from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 @@ -81,6 +82,18 @@ def locate_binary(name :str) -> str: raise RequirementError(f"Binary {name} does not exist.") +def clear_vt100_escape_codes(data :Union[bytes, str]): + # https://stackoverflow.com/a/43627833/929999 + if type(data) == bytes: + vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8') + else: + vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]' + + for match in re.findall(vt100_escape_regex, data, re.IGNORECASE): + data = data.replace(match, '' if type(data) == str else b'') + + return data + def json_dumps(*args :str, **kwargs :str) -> str: return json.dumps(*args, **{**kwargs, 'cls': JSON}) @@ -168,7 +181,8 @@ class SysCommandWorker: peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, logfile :Optional[None] = None, - working_directory :Optional[str] = './'): + working_directory :Optional[str] = './', + remove_vt100_escape_codes_from_lines :bool = True): if not callbacks: callbacks = {} @@ -200,6 +214,7 @@ class SysCommandWorker: self.child_fd :Optional[int] = None self.started :Optional[float] = None self.ended :Optional[float] = None + self.remove_vt100_escape_codes_from_lines :bool = remove_vt100_escape_codes_from_lines def __contains__(self, key: bytes) -> bool: """ @@ -216,6 +231,9 @@ class SysCommandWorker: def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]: for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'): if line: + if self.remove_vt100_escape_codes_from_lines: + line = clear_vt100_escape_codes(line) + yield line + b'\n' self._trace_log_pos = self._trace_log.rfind(b'\n') @@ -368,7 +386,8 @@ class SysCommand: start_callback :Optional[Callable[[Any], Any]] = None, peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, - working_directory :Optional[str] = './'): + working_directory :Optional[str] = './', + remove_vt100_escape_codes_from_lines :bool = True): _callbacks = {} if callbacks: @@ -382,6 +401,7 @@ class SysCommand: self.peak_output = peak_output self.environment_vars = environment_vars self.working_directory = working_directory + self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines self.session :Optional[SysCommandWorker] = None self.create_session() @@ -435,7 +455,7 @@ class SysCommand: if self.session: return self.session - with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars) as session: + with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines) as session: if not self.session: self.session = session -- cgit v1.2.3-54-g00ecf