#!/usr/bin/python3 #TODO: capture ^C during commands import cmd, functools, getpass, os, re, subprocess, sys, time try: import readline except: pass from config import * from lib_autopeer import * def as_from_user(which=None): if which is not None: try: which = int(which) except: raise Exception('your AS number has to be a number, silly') user_ases=[] if not re.match(r'^[a-zA-Z0-9-]+$', MNTNER): raise Exception('your mntner name has bad characters') filenames = subprocess.run(['grep', '-l', '-r', '-P', fr'^\s*mnt-by:\s*{MNTNER}\s*$', '/opt/autopeer/dn42-registry/data/aut-num'], capture_output=True, text=True).stdout for filename in filenames.split('\n'): if not len(filename): continue if (mo := re.match(r'^.*/?AS(\d+)$', filename)): user_ases.append(int(mo.group(1))) else: print(f"oops, something went wrong getting your ASes, specifically: {filename}", file=sys.stderr) if MY_ASN in user_ases and which: # allow owner to operate as anyone return which elif which is None: # at startup, use the first found return user_ases[0] elif which in user_ases: # if the user picked an AS in their list return which else: raise Exception('not yours') def parse(num_args): def _decorator(f): @functools.wraps(f) def real_func(self, arg): args = arg.split() if len(args) != num_args: # TODO: prompt user for each arg that's missing if f.__doc__: doc = f.__doc__.split('\n', 1)[0] else: doc = '(undocumented args)' print(f'Error: wrong number of args', file=sys.stderr) print(f'Expected {num_args}: {doc}', file=sys.stderr) print(f'Got {len(args)}: {args}', file=sys.stderr) return return f(self, *arg.split()) return real_func return _decorator USER = getpass.getuser() MNTNER = USER.upper() + '-MNT' SELECTED_ASN = as_from_user() class AutopeerShell(cmd.Cmd): def preloop(self): self.intro = f'Welcome to the autopeer shell. Type help or ? to list commands.\nSelected AS: {SELECTED_ASN}' self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> ' def postcmd(self, stop, line): self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> ' return stop @parse(0) def do_ls(self): '''(no args) List your peers''' curs = DB.execute('SELECT name, endpoint, port FROM peers WHERE asn=:asn', dict(asn=SELECTED_ASN)) print(f'Active peerings for {SELECTED_ASN}:') while row := curs.fetchone(): print(f'- {row[0]} ({row[1]} {row[2]})') print() @parse(5) def do_addpeer(self, name, pubkey, endpoint, port, ipll): ''' Add a new peer''' NAME_REGEX = r'^[a-zA-Z][a-zA-Z0-9]{0,8}$' if not re.match(NAME_REGEX, name): print(f'Error: name must match {NAME_REGEX}', file=sys.stderr) return PUBKEY_REGEX = r'^[=a-zA-Z0-9+/-]+$' if not re.match(PUBKEY_REGEX, pubkey): print(f'Error: pubkey must match {PUBKEY_REGEX}', file=sys.stderr) return ENDPOINT_REGEX = r'^((?#IPv4)[0-9.]+|(?#IPv6)[0-9a-fA-F:]+|(?#DNS)(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]))$' # (([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]) via https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address if not re.match(ENDPOINT_REGEX, endpoint): print(f'Error: wg address must match {ENDPOINT_REGEX}', file=sys.stderr) return PORT_REGEX = r'^\d{1,5}$' if not re.match(PORT_REGEX, port): print(f'Error: port must match {PORT_REGEX}', file=sys.stderr) return IPLL_REGEX = r'^[0-9a-fA-F:]+$' if not re.match(IPLL_REGEX, ipll): print(f'Error: ipv6 link local must match {IPLL_REGEX}', file=sys.stderr) return try: curs = DB.execute( 'INSERT INTO peers (name, asn, pubkey, endpoint, port, ipll, creator_ip, creator_name, creator_date) VALUES (:name, :asn, :pubkey, :endpoint, :port, :ipll, :creator_ip, :creator_name, :creator_date)', dict(name=name, asn=SELECTED_ASN, pubkey=pubkey, endpoint=endpoint, port=port, ipll=ipll, creator_ip=os.getenv('SSH_CONNECTION'), creator_name=USER, creator_date=time.time()) ) except sqlite3.IntegrityError as e: print("Something went wrong! Maybe try using a name you haven't used already...", file=sys.stderr) return if not curs.rowcount: print("Something went wrong! The database didn't give us an error but no row was inserted!", file=sys.stderr) return print('🎉') print() self.do_showpeer(name) print() self.do_showbird(name) print() self.do_showwg(name) print() print() @parse(1) def do_delpeer(self, name): ''' Delete your peering''' curs = DB.execute( 'DELETE FROM peers WHERE name = :name AND asn = :asn', dict(name=name, asn=SELECTED_ASN) ) if curs.rowcount: print(f'Deleted "{name}"') else: print(f'"{name}" (AS {SELECTED_ASN}) not found', file=sys.stderr) @parse(1) def do_showpeer(self, name): me = _get_my_info(SELECTED_ASN) you = _get_peer_info(name, SELECTED_ASN) print(f''' Link name: {name} My endpoint: {me.endpoint}:{me.port} My ASN: {me.asn} My Wireguard Public Key: {me.pubkey} My Tunnel IPv6LL: {me.ipll} Your endpoint: {you.endpoint}:{you.port} Your ASN: {you.asn} Your Wireguard Public Key: {you.pubkey} Your Tunnel IPv6LL: {you.ipll} ''') @parse(1) def do_showbird(self, name): ''' Show basic bird config''' print(_bird_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN))) @parse(1) def do_showwg(self, name): ''' Show basic Wireguard config ''' print(_wg_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN))) @parse(1) def do_as(self, asn): ''' Select another AS you own''' global SELECTED_ASN try: SELECTED_ASN = as_from_user(asn) except: print("No way! Try an AS you're mntner of!", file=sys.stderr) @parse(0) def do_exit(self): return True def do_EOF(self, arg): print("^D") return True @parse(0) def do_reload(self): os.execvp(sys.argv[0], sys.argv) @parse(0) def do_version(self): print("autopeer version:") os.system("git -c safe.directory=/opt/autopeer -C /opt/autopeer/ describe --always") print("by steering7253 https://steering.dn42 https://st33ri.ng") if __name__ == '__main__': shell = AutopeerShell() if len(os.getenv('SSH_ORIGINAL_COMMAND', '')): shell.onecmd(os.getenv('SSH_ORIGINAL_COMMAND')) else: shell.cmdloop()