From 3ed8db5ef0b065583e35f924f3f9d09290f03a37 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Wed, 8 Jul 2020 10:22:25 +0000 Subject: Added support for .py profiles. Added a simple 'desktop.py' for now that is just a mock to make sure it's working. --- archinstall/lib/profiles.py | 219 +++++++++++++++++++++----------------------- 1 file changed, 105 insertions(+), 114 deletions(-) (limited to 'archinstall/lib/profiles.py') diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py index 83f217d5..bccdf34d 100644 --- a/archinstall/lib/profiles.py +++ b/archinstall/lib/profiles.py @@ -1,4 +1,5 @@ import os, urllib.request, urllib.parse, ssl, json +import importlib.util, sys from collections import OrderedDict from .general import multisplit, sys_command, log from .exceptions import * @@ -13,49 +14,19 @@ def grab_url_data(path): response = urllib.request.urlopen(safe_path, context=ssl_context) return response.read() -def get_application_instructions(target): - instructions = {} - - for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']: - if os.path.isfile(f'{path}/applications/{target}.json'): - return os.path.abspath(f'{path}/{self.name}.json') - - try: - if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')): - self._cache = cache - return f'{UPSTREAM_URL}/{self.name}.json' - except urllib.error.HTTPError: - pass - try: - if (cache := grab_url_data(f'{UPSTREAM_URL}/applications/{self.name}.json')): - self._cache = cache - return f'{UPSTREAM_URL}/applications/{self.name}.json' - except urllib.error.HTTPError: - pass - - try: - instructions = grab_url_data(f'{UPSTREAM_URL}/applications/{target}.json').decode('UTF-8') - log('[N] Found application instructions for: {}'.format(target)) - except urllib.error.HTTPError: - log('[N] Could not find remote instructions. yrying local instructions under ./profiles/applications') - local_path = './profiles/applications' if os.path.isfile('./archinstall.py') else './archinstall/profiles/applications' # Dangerous assumption - if os.path.isfile(f'{local_path}/{target}.json'): - with open(f'{local_path}/{target}.json', 'r') as fh: - instructions = fh.read() - - log('[N] Found local application instructions for: {}'.format(target)) - else: - log('[N] No instructions found for: {}'.format(target)) - return instructions - - try: - instructions = json.loads(instructions, object_pairs_hook=oDict) - except: - log('[E] JSON syntax error in {}'.format('{}/applications/{}.json'.format(args['profiles-path'], target))) - traceback.print_exc() - exit(1) - - return instructions +class Imported(): + def __init__(self, spec, imported): + self.spec = spec + self.imported = imported + + def __enter__(self, *args, **kwargs): + self.spec.loader.exec_module(self.imported) + return self + + def __exit__(self, *args, **kwargs): + # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager + if len(args) >= 2 and args[1]: + raise args[1] class Profile(): def __init__(self, installer, name, args={}): @@ -69,10 +40,18 @@ class Profile(): @property def path(self, *args, **kwargs): - for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']: + for path in ['./profiles', '/etc/archinstall', '/etc/archinstall/profiles', os.path.abspath(f'{os.path.dirname(__file__)}/../profiles')]: # Step out of /lib if os.path.isfile(f'{path}/{self.name}.json'): return os.path.abspath(f'{path}/{self.name}.json') + elif os.path.isfile(f'{path}/{self.name}.py'): + return os.path.abspath(f'{path}/{self.name}.py') + try: + if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.py')): + self._cache = cache + return f'{UPSTREAM_URL}/{self.name}.py' + except urllib.error.HTTPError: + pass try: if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')): self._cache = cache @@ -88,9 +67,17 @@ class Profile(): return None + def py_exec_mock(self): + spec.loader.exec_module(imported) + def load_instructions(self): if (absolute_path := self.path): - if absolute_path[:4] == 'http': + if os.path.splitext(absolute_path)[1] == '.py': + spec = importlib.util.spec_from_file_location(absolute_path, absolute_path) + imported = importlib.util.module_from_spec(spec) + sys.modules[os.path.basename(absolute_path)] = imported + return Imported(spec, imported) + elif absolute_path[:4] == 'http': return json.loads(self._cache) with open(absolute_path, 'r') as fh: @@ -100,77 +87,81 @@ class Profile(): def install(self): instructions = self.load_instructions() - if 'args' in instructions: - self.args = instructions['args'] - if 'post' in instructions: - instructions = instructions['post'] - - for title in instructions: - log(f'Running post installation step {title}') - - log('[N] Network Deploy: {}'.format(title)) - if type(instructions[title]) == str: - log('[N] Loading {} configuration'.format(instructions[title])) - log(f'Loading {instructions[title]} configuration') - instructions[title] = Application(self.installer, instructions[title], args=self.args) - instructions[title].install() - else: - for command in instructions[title]: - raw_command = command - opts = instructions[title][command] if type(instructions[title][command]) in (dict, OrderedDict) else {} - if len(opts): - if 'pass-args' in opts or 'format' in opts: + if type(instructions) == Imported: + with instructions as runtime: + log(f'Profile {self.name} finished successfully.') + else: + if 'args' in instructions: + self.args = instructions['args'] + if 'post' in instructions: + instructions = instructions['post'] + + for title in instructions: + log(f'Running post installation step {title}') + + log('[N] Network Deploy: {}'.format(title)) + if type(instructions[title]) == str: + log('[N] Loading {} configuration'.format(instructions[title])) + log(f'Loading {instructions[title]} configuration') + instructions[title] = Application(self.installer, instructions[title], args=self.args) + instructions[title].install() + else: + for command in instructions[title]: + raw_command = command + opts = instructions[title][command] if type(instructions[title][command]) in (dict, OrderedDict) else {} + if len(opts): + if 'pass-args' in opts or 'format' in opts: + command = command.format(**self.args) + ## FIXME: Instead of deleting the two options + ## in order to mute command output further down, + ## check for a 'debug' flag per command and delete these two + if 'pass-args' in opts: + del(opts['pass-args']) + elif 'format' in opts: + del(opts['format']) + + if 'pass-args' in opts and opts['pass-args']: command = command.format(**self.args) - ## FIXME: Instead of deleting the two options - ## in order to mute command output further down, - ## check for a 'debug' flag per command and delete these two - if 'pass-args' in opts: - del(opts['pass-args']) - elif 'format' in opts: - del(opts['format']) - - if 'pass-args' in opts and opts['pass-args']: - command = command.format(**self.args) - - if 'runas' in opts and f'su - {opts["runas"]} -c' not in command: - command = command.replace('"', '\\"') - command = f'su - {opts["runas"]} -c "{command}"' - - if 'no-chroot' in opts and opts['no-chroot']: - log(f'Executing {command} as simple command from live-cd.') - o = sys_command(command, opts) - elif 'chroot' in opts and opts['chroot']: - log(f'Executing {command} in chroot.') - ## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces). - ## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option. - ## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command. - o = sys_command(f"mount /dev/mapper/luksdev {self.installer.mountpoint}") - o = sys_command(f"cd {self.installer.mountpoint}; cp /etc/resolv.conf etc") - o = sys_command(f"cd {self.installer.mountpoint}; mount -t proc /proc proc") - o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /sys sys") - o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /dev dev") - o = sys_command(f'chroot {self.installer.mountpoint} /bin/bash -c "{command}"') - o = sys_command(f"cd {self.installer.mountpoint}; umount -R dev") - o = sys_command(f"cd {self.installer.mountpoint}; umount -R sys") - o = sys_command(f"cd {self.installer.mountpoint}; umount -R proc") - else: - if 'boot' in opts and opts['boot']: - log(f'Executing {command} in boot mode.') - defaults = { - 'login:' : 'root\n', - 'Password:' : self.args['password']+'\n', - f'[root@{self.args["hostname"]} ~]#' : command+'\n', - } - if not 'events' in opts: opts['events'] = {} - events = {**defaults, **opts['events']} - del(opts['events']) - o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} -b --machine temporary', events=events)) + + if 'runas' in opts and f'su - {opts["runas"]} -c' not in command: + command = command.replace('"', '\\"') + command = f'su - {opts["runas"]} -c "{command}"' + + if 'no-chroot' in opts and opts['no-chroot']: + log(f'Executing {command} as simple command from live-cd.') + o = sys_command(command, opts) + elif 'chroot' in opts and opts['chroot']: + log(f'Executing {command} in chroot.') + ## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces). + ## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option. + ## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command. + o = sys_command(f"mount /dev/mapper/luksdev {self.installer.mountpoint}") + o = sys_command(f"cd {self.installer.mountpoint}; cp /etc/resolv.conf etc") + o = sys_command(f"cd {self.installer.mountpoint}; mount -t proc /proc proc") + o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /sys sys") + o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /dev dev") + o = sys_command(f'chroot {self.installer.mountpoint} /bin/bash -c "{command}"') + o = sys_command(f"cd {self.installer.mountpoint}; umount -R dev") + o = sys_command(f"cd {self.installer.mountpoint}; umount -R sys") + o = sys_command(f"cd {self.installer.mountpoint}; umount -R proc") else: - log(f'Executing {command} in with systemd-nspawn without boot.') - o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} --machine temporary {command}')) - if type(instructions[title][raw_command]) == bytes and len(instructions['post'][title][raw_command]) and not instructions['post'][title][raw_command] in o: - log(f'{command} failed: {o.decode("UTF-8")}') - log('[W] Post install command failed: {}'.format(o.decode('UTF-8'))) + if 'boot' in opts and opts['boot']: + log(f'Executing {command} in boot mode.') + defaults = { + 'login:' : 'root\n', + 'Password:' : self.args['password']+'\n', + f'[root@{self.args["hostname"]} ~]#' : command+'\n', + } + if not 'events' in opts: opts['events'] = {} + events = {**defaults, **opts['events']} + del(opts['events']) + o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} -b --machine temporary', events=events)) + else: + log(f'Executing {command} in with systemd-nspawn without boot.') + o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} --machine temporary {command}')) + if type(instructions[title][raw_command]) == bytes and len(instructions['post'][title][raw_command]) and not instructions['post'][title][raw_command] in o: + log(f'{command} failed: {o.decode("UTF-8")}') + log('[W] Post install command failed: {}'.format(o.decode('UTF-8'))) class Application(Profile): @property -- cgit v1.2.3-54-g00ecf