#!/usr/bin/python3 import cmd, functools, getpass, os, re, subprocess, sys, time, traceback try: import readline except: pass from config import * from lib_autopeer import * NAME_REGEX = r'^[a-zA-Z][a-zA-Z0-9]{0,8}$' def as_from_user(which=None): if which is not None: try: which = int(which) except: raise Exception('your AS number has to be a number, silly') user_ases=[] if not re.match(r'^[a-zA-Z0-9-]+$', MNTNER): raise Exception('your mntner name has bad characters') filenames = subprocess.run(['grep', '-l', '-r', '-P', fr'^\s*mnt-by:\s*{MNTNER}\s*$', '/opt/autopeer/dn42-registry/data/aut-num'], capture_output=True, text=True).stdout for filename in filenames.split('\n'): if not len(filename): continue if (mo := re.match(r'^.*/?AS(\d+)$', filename)): user_ases.append(int(mo.group(1))) else: print(f"oops, something went wrong getting your ASes, specifically: {filename}", file=sys.stderr) if not user_ases: raise Exception("you don't mnt any AS?!") if MY_ASN in user_ases and which: # allow owner to operate as anyone return which elif which is None: # at startup, use the first found return user_ases[0] elif which in user_ases: # if the user picked an AS in their list return which else: raise Exception('not yours') def parse(num_args): def _decorator(f): @functools.wraps(f) def real_func(self, arg): args = arg.split() if len(args) != num_args: # TODO: prompt user for each arg that's missing if f.__doc__: doc = f.__doc__.split('\n', 1)[0] else: doc = '(undocumented args)' print(f'Error: wrong number of args', file=sys.stderr) print(f'Expected {num_args}: {doc}', file=sys.stderr) print(f'Got {len(args)}: {args}', file=sys.stderr) return return f(self, *arg.split()) return real_func return _decorator USER = getpass.getuser() MNTNER = USER.upper() + '-MNT' SELECTED_ASN = as_from_user() class AutopeerShell(cmd.Cmd): def preloop(self): self.intro = f'Welcome to the autopeer shell. Type help or ? to list commands.\nSelected AS: {SELECTED_ASN}' self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> ' def postcmd(self, stop, line): self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> ' return stop def onecmd(self, line): try: return super().onecmd(line) except Exception as e: traceback.print_exc() return False def emptyline(self): return False @parse(0) def do_ls(self): '''(no args) List your peers''' curs = DB.execute('SELECT name, endpoint, port FROM peers WHERE asn=:asn', dict(asn=SELECTED_ASN)) print(f'Active peerings for {SELECTED_ASN}:') while row := curs.fetchone(): print(f'- {row[0]} ({row[1]} {row[2]})') print() def help_addpeer(self): print(self.do_addpeer.__doc__) me = _get_my_info(SELECTED_ASN) print(f''' My endpoint: {me.endpoint}:{me.port} My ASN: {me.asn} My Wireguard Public Key: {me.pubkey} My Tunnel IPv6LL: {me.ipll} ''') @parse(5) def do_addpeer(self, name, pubkey, endpoint, port, ipll): ''' Add a new peer. is an opaque identifier and must be unique for your ASN is your WireGuard public key is your WireGuard IP or hostname is your WireGuard port is your WireGuard IPv6LL''' if not re.match(NAME_REGEX, name): print(f'Error: name must match {NAME_REGEX}', file=sys.stderr) return PUBKEY_REGEX = r'^[=a-zA-Z0-9+/-]+$' if not re.match(PUBKEY_REGEX, pubkey): print(f'Error: pubkey must match {PUBKEY_REGEX}', file=sys.stderr) return ENDPOINT_REGEX = r'^((?#IPv4)[0-9.]+|(?#IPv6)[0-9a-fA-F:]+|(?#DNS)(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]))$' # (([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]) via https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address if not re.match(ENDPOINT_REGEX, endpoint): print(f'Error: wg address must match {ENDPOINT_REGEX}', file=sys.stderr) return PORT_REGEX = r'^\d{1,5}$' if not re.match(PORT_REGEX, port): print(f'Error: port must match {PORT_REGEX}', file=sys.stderr) return IPLL_REGEX = r'^[0-9a-fA-F:]+$' if not re.match(IPLL_REGEX, ipll): print(f'Error: ipv6 link local must match {IPLL_REGEX}', file=sys.stderr) return sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] Created new peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True) try: curs = DB.execute( 'INSERT INTO peers (name, asn, pubkey, endpoint, port, ipll, creator_ip, creator_name, creator_date, deleted) VALUES (:name, :asn, :pubkey, :endpoint, :port, :ipll, :creator_ip, :creator_name, :creator_date, 0)', dict(name=name, asn=SELECTED_ASN, pubkey=pubkey, endpoint=endpoint, port=port, ipll=ipll, creator_ip=os.getenv('SSH_CONNECTION'), creator_name=USER, creator_date=time.time()) ) except sqlite3.IntegrityError as e: print("Something went wrong! Maybe try using a name you haven't used already...", file=sys.stderr) return if not curs.rowcount: print("Something went wrong! The database didn't give us an error but no row was inserted!", file=sys.stderr) return print('🎉') print() self.do_showpeer(name) print() self.do_showtemplates(name) print() print() @parse(1) def do_delpeer(self, name): ''' Delete your peering. Note: The deletion will be processed the next time the cronjob runs, at which time the name will become available.''' sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] Deleted peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True) curs = DB.execute( 'UPDATE peers SET deleted = 1 WHERE name = :name AND asn = :asn AND deleted = 0', dict(name=name, asn=SELECTED_ASN) ) if curs.rowcount: print(f'Deleted "{name}"') else: print(f'"{name}" (AS {SELECTED_ASN}) not found', file=sys.stderr) @parse(1) def do_showpeer(self, name): ''' Show textual info about a peering''' me = _get_my_info(SELECTED_ASN) you = _get_peer_info(name, SELECTED_ASN) print(f''' Link name: {name} My endpoint: {me.endpoint}:{me.port} My ASN: {me.asn} My Wireguard Public Key: {me.pubkey} My Tunnel IPv6LL: {me.ipll} Your endpoint: {you.endpoint}:{you.port} Your ASN: {you.asn} Your Wireguard Public Key: {you.pubkey} Your Tunnel IPv6LL: {you.ipll} ''') @parse(1) def do_showtemplates(self, name): ''' Show basic config templates for your side''' print('### USE THESE CONFIGS ON YOUR NODE') print(_bird_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN))) print(_wg_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN))) @parse(1) def do_showconf(self, name): ''' Show my side config ''' print('### MY SIDE CONFIGS') print(_bird_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN))) print(_wg_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN))) @parse(1) def do_as(self, asn): ''' Select another AS you own''' global SELECTED_ASN try: SELECTED_ASN = as_from_user(asn) except: print("No way! Try an AS you're mntner of!", file=sys.stderr) @parse(0) def do_birdc(self): '''Run birdc''' os.system('/usr/sbin/birdc -r') @parse(1) def do_stat(self, name): ''' Show link stats''' if not re.match(NAME_REGEX, name): print(f'Error: name must match {NAME_REGEX}', file=sys.stderr) return os.system(f'ip stat show dev wg{SELECTED_ASN%10000:04}{name} group link') @parse(0) def do_exit(self): return True def do_EOF(self, arg): print("^D") return True @parse(0) def do_reload(self): os.execvp(sys.argv[0], sys.argv) @parse(0) def do_version(self): print("autopeer version:", end=' ') sys.stdout.flush() os.system("git -c safe.directory=/opt/autopeer -C /opt/autopeer/ describe --always") print("by steering7253 https://steering.dn42 https://st33ri.ng") print("Source available at https://cgit.dn42/steering/autopeer.git/ https://cgit.space/steering/autopeer.git/") if __name__ == '__main__': shell = AutopeerShell() command = os.getenv('SSH_ORIGINAL_COMMAND', '') if len(command): sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] {USER} ran {command!r} via {os.getenv('SSH_CONNECTION')}", text=True) shell.onecmd(command) else: sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] {USER} logged in via {os.getenv('SSH_CONNECTION')}", text=True) shell.cmdloop()