Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
authorAnton Hvornum <anton.feeds+github@gmail.com>2020-07-08 10:22:25 +0000
committerAnton Hvornum <anton.feeds+github@gmail.com>2020-07-08 10:22:25 +0000
commit3ed8db5ef0b065583e35f924f3f9d09290f03a37 (patch)
tree78e4fb4b8ab5b5532b18c7e4bf93fc825b4d0290 /archinstall/lib
parenta6956073265fe626ea9fa710215c7680d1e3c05c (diff)
Added support for .py profiles. Added a simple 'desktop.py' for now that is just a mock to make sure it's working.
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/profiles.py219
1 files changed, 105 insertions, 114 deletions
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 83f217d5..bccdf34d 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -1,4 +1,5 @@
import os, urllib.request, urllib.parse, ssl, json
+import importlib.util, sys
from collections import OrderedDict
from .general import multisplit, sys_command, log
from .exceptions import *
@@ -13,49 +14,19 @@ def grab_url_data(path):
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
-def get_application_instructions(target):
- instructions = {}
-
- for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']:
- if os.path.isfile(f'{path}/applications/{target}.json'):
- return os.path.abspath(f'{path}/{self.name}.json')
-
- try:
- if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
- self._cache = cache
- return f'{UPSTREAM_URL}/{self.name}.json'
- except urllib.error.HTTPError:
- pass
- try:
- if (cache := grab_url_data(f'{UPSTREAM_URL}/applications/{self.name}.json')):
- self._cache = cache
- return f'{UPSTREAM_URL}/applications/{self.name}.json'
- except urllib.error.HTTPError:
- pass
-
- try:
- instructions = grab_url_data(f'{UPSTREAM_URL}/applications/{target}.json').decode('UTF-8')
- log('[N] Found application instructions for: {}'.format(target))
- except urllib.error.HTTPError:
- log('[N] Could not find remote instructions. yrying local instructions under ./profiles/applications')
- local_path = './profiles/applications' if os.path.isfile('./archinstall.py') else './archinstall/profiles/applications' # Dangerous assumption
- if os.path.isfile(f'{local_path}/{target}.json'):
- with open(f'{local_path}/{target}.json', 'r') as fh:
- instructions = fh.read()
-
- log('[N] Found local application instructions for: {}'.format(target))
- else:
- log('[N] No instructions found for: {}'.format(target))
- return instructions
-
- try:
- instructions = json.loads(instructions, object_pairs_hook=oDict)
- except:
- log('[E] JSON syntax error in {}'.format('{}/applications/{}.json'.format(args['profiles-path'], target)))
- traceback.print_exc()
- exit(1)
-
- return instructions
+class Imported():
+ def __init__(self, spec, imported):
+ self.spec = spec
+ self.imported = imported
+
+ def __enter__(self, *args, **kwargs):
+ self.spec.loader.exec_module(self.imported)
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
+ if len(args) >= 2 and args[1]:
+ raise args[1]
class Profile():
def __init__(self, installer, name, args={}):
@@ -69,11 +40,19 @@ class Profile():
@property
def path(self, *args, **kwargs):
- for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']:
+ for path in ['./profiles', '/etc/archinstall', '/etc/archinstall/profiles', os.path.abspath(f'{os.path.dirname(__file__)}/../profiles')]: # Step out of /lib
if os.path.isfile(f'{path}/{self.name}.json'):
return os.path.abspath(f'{path}/{self.name}.json')
+ elif os.path.isfile(f'{path}/{self.name}.py'):
+ return os.path.abspath(f'{path}/{self.name}.py')
try:
+ if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.py')):
+ self._cache = cache
+ return f'{UPSTREAM_URL}/{self.name}.py'
+ except urllib.error.HTTPError:
+ pass
+ try:
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
self._cache = cache
return f'{UPSTREAM_URL}/{self.name}.json'
@@ -88,9 +67,17 @@ class Profile():
return None
+ def py_exec_mock(self):
+ spec.loader.exec_module(imported)
+
def load_instructions(self):
if (absolute_path := self.path):
- if absolute_path[:4] == 'http':
+ if os.path.splitext(absolute_path)[1] == '.py':
+ spec = importlib.util.spec_from_file_location(absolute_path, absolute_path)
+ imported = importlib.util.module_from_spec(spec)
+ sys.modules[os.path.basename(absolute_path)] = imported
+ return Imported(spec, imported)
+ elif absolute_path[:4] == 'http':
return json.loads(self._cache)
with open(absolute_path, 'r') as fh:
@@ -100,77 +87,81 @@ class Profile():
def install(self):
instructions = self.load_instructions()
- if 'args' in instructions:
- self.args = instructions['args']
- if 'post' in instructions:
- instructions = instructions['post']
-
- for title in instructions:
- log(f'Running post installation step {title}')
-
- log('[N] Network Deploy: {}'.format(title))
- if type(instructions[title]) == str:
- log('[N] Loading {} configuration'.format(instructions[title]))
- log(f'Loading {instructions[title]} configuration')
- instructions[title] = Application(self.installer, instructions[title], args=self.args)
- instructions[title].install()
- else:
- for command in instructions[title]:
- raw_command = command
- opts = instructions[title][command] if type(instructions[title][command]) in (dict, OrderedDict) else {}
- if len(opts):
- if 'pass-args' in opts or 'format' in opts:
+ if type(instructions) == Imported:
+ with instructions as runtime:
+ log(f'Profile {self.name} finished successfully.')
+ else:
+ if 'args' in instructions:
+ self.args = instructions['args']
+ if 'post' in instructions:
+ instructions = instructions['post']
+
+ for title in instructions:
+ log(f'Running post installation step {title}')
+
+ log('[N] Network Deploy: {}'.format(title))
+ if type(instructions[title]) == str:
+ log('[N] Loading {} configuration'.format(instructions[title]))
+ log(f'Loading {instructions[title]} configuration')
+ instructions[title] = Application(self.installer, instructions[title], args=self.args)
+ instructions[title].install()
+ else:
+ for command in instructions[title]:
+ raw_command = command
+ opts = instructions[title][command] if type(instructions[title][command]) in (dict, OrderedDict) else {}
+ if len(opts):
+ if 'pass-args' in opts or 'format' in opts:
+ command = command.format(**self.args)
+ ## FIXME: Instead of deleting the two options
+ ## in order to mute command output further down,
+ ## check for a 'debug' flag per command and delete these two
+ if 'pass-args' in opts:
+ del(opts['pass-args'])
+ elif 'format' in opts:
+ del(opts['format'])
+
+ if 'pass-args' in opts and opts['pass-args']:
command = command.format(**self.args)
- ## FIXME: Instead of deleting the two options
- ## in order to mute command output further down,
- ## check for a 'debug' flag per command and delete these two
- if 'pass-args' in opts:
- del(opts['pass-args'])
- elif 'format' in opts:
- del(opts['format'])
-
- if 'pass-args' in opts and opts['pass-args']:
- command = command.format(**self.args)
-
- if 'runas' in opts and f'su - {opts["runas"]} -c' not in command:
- command = command.replace('"', '\\"')
- command = f'su - {opts["runas"]} -c "{command}"'
-
- if 'no-chroot' in opts and opts['no-chroot']:
- log(f'Executing {command} as simple command from live-cd.')
- o = sys_command(command, opts)
- elif 'chroot' in opts and opts['chroot']:
- log(f'Executing {command} in chroot.')
- ## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces).
- ## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option.
- ## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command.
- o = sys_command(f"mount /dev/mapper/luksdev {self.installer.mountpoint}")
- o = sys_command(f"cd {self.installer.mountpoint}; cp /etc/resolv.conf etc")
- o = sys_command(f"cd {self.installer.mountpoint}; mount -t proc /proc proc")
- o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /sys sys")
- o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /dev dev")
- o = sys_command(f'chroot {self.installer.mountpoint} /bin/bash -c "{command}"')
- o = sys_command(f"cd {self.installer.mountpoint}; umount -R dev")
- o = sys_command(f"cd {self.installer.mountpoint}; umount -R sys")
- o = sys_command(f"cd {self.installer.mountpoint}; umount -R proc")
- else:
- if 'boot' in opts and opts['boot']:
- log(f'Executing {command} in boot mode.')
- defaults = {
- 'login:' : 'root\n',
- 'Password:' : self.args['password']+'\n',
- f'[root@{self.args["hostname"]} ~]#' : command+'\n',
- }
- if not 'events' in opts: opts['events'] = {}
- events = {**defaults, **opts['events']}
- del(opts['events'])
- o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} -b --machine temporary', events=events))
+
+ if 'runas' in opts and f'su - {opts["runas"]} -c' not in command:
+ command = command.replace('"', '\\"')
+ command = f'su - {opts["runas"]} -c "{command}"'
+
+ if 'no-chroot' in opts and opts['no-chroot']:
+ log(f'Executing {command} as simple command from live-cd.')
+ o = sys_command(command, opts)
+ elif 'chroot' in opts and opts['chroot']:
+ log(f'Executing {command} in chroot.')
+ ## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces).
+ ## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option.
+ ## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command.
+ o = sys_command(f"mount /dev/mapper/luksdev {self.installer.mountpoint}")
+ o = sys_command(f"cd {self.installer.mountpoint}; cp /etc/resolv.conf etc")
+ o = sys_command(f"cd {self.installer.mountpoint}; mount -t proc /proc proc")
+ o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /sys sys")
+ o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /dev dev")
+ o = sys_command(f'chroot {self.installer.mountpoint} /bin/bash -c "{command}"')
+ o = sys_command(f"cd {self.installer.mountpoint}; umount -R dev")
+ o = sys_command(f"cd {self.installer.mountpoint}; umount -R sys")
+ o = sys_command(f"cd {self.installer.mountpoint}; umount -R proc")
else:
- log(f'Executing {command} in with systemd-nspawn without boot.')
- o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} --machine temporary {command}'))
- if type(instructions[title][raw_command]) == bytes and len(instructions['post'][title][raw_command]) and not instructions['post'][title][raw_command] in o:
- log(f'{command} failed: {o.decode("UTF-8")}')
- log('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
+ if 'boot' in opts and opts['boot']:
+ log(f'Executing {command} in boot mode.')
+ defaults = {
+ 'login:' : 'root\n',
+ 'Password:' : self.args['password']+'\n',
+ f'[root@{self.args["hostname"]} ~]#' : command+'\n',
+ }
+ if not 'events' in opts: opts['events'] = {}
+ events = {**defaults, **opts['events']}
+ del(opts['events'])
+ o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} -b --machine temporary', events=events))
+ else:
+ log(f'Executing {command} in with systemd-nspawn without boot.')
+ o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} --machine temporary {command}'))
+ if type(instructions[title][raw_command]) == bytes and len(instructions['post'][title][raw_command]) and not instructions['post'][title][raw_command] in o:
+ log(f'{command} failed: {o.decode("UTF-8")}')
+ log('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
class Application(Profile):
@property