""" This is the PyJail, a jailing tool for running Python apps in a sandboxed environment. Version: 0.2.1-next1 """ import os import time import runpy class PyJail: """ The jail manager, handles all system calls and such. """ def __init__(self, debug=False): self.rootpath = "" self.rootpath = self.fs() self._debug = debug with open(self.fs("/proc/klog"), "w") as f: # Always use jailmgr.msg() from this point onwards. f.write(f"[{time.time}] [jailmgr.__init__()] [INFO] START LOG") f.close() with open(self.fs("/proc/kproc"), "w") as f: f.write("proc: jailmgr(1)") self._program_counter = 2 f.close() self._resolve_symlinks = False if os.path.isdir(self.fs("/bin")) else True def run_program(self, path_to_bin): """ Runs a specified program. """ path_to_bin = self.fs(path_to_bin) # print(path_to_bin) # print(str(self.rootpath) + str(path_to_bin)) if path_to_bin == 3 or path_to_bin == 2: self.msg("jailmgr.run_program()", "An error has occurred launching the program.", True, "WARNING") else: with open(self.fs("/proc/kproc"), "a+") as f: f.write(f"proc: {path_to_bin}({self._program_counter})") self._program_counter += 1 runpy.run_path(path_to_bin) def msg(self, caller: str, message: str, emit: bool = False, log_level: str = "INFO"): """ The custom message parser, can parse messages and alert apps of said messages. Replaces print statements. Args: caller: The program that called the logger message: Is the message to parse. emit: If the message needs to be passed to apps. log_level: The loglevel, either DEBUG, INFO, WARNING, ERROR, CRITICAL """ if self._debug is True: emit = True accepted_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] if log_level.upper() not in accepted_log_levels: self.msg(f"jailmgr.msg()",f"Not accepted loglevel!! {log_level}", False, "ERROR") with open(self.fs("/proc/klog"), "a+") as f: f.write(f"[{time.time}] [{caller}] [{log_level}] {message}") if emit is True: print(message) return 0 def fs(self, check_path=None, resolve_symlinks=True): """ Keeps track of the jailed filesystem and makes sure any calls to any file get done in the jailed filesystem """ if check_path is not None: if os.path.exists(f"{self.rootpath}{check_path}"): if self._resolve_symlinks is True or resolve_symlinks is True: # This exists to ease /bin and other symlinks that are always present. # It allows the system to drastically speed up resolving symlinks. symlinkable_dirs = ["/bin", "/sbin", "/lib", "/lib64"] for directory in symlinkable_dirs: if check_path.startswith(directory): check_path_usr_merge = f"/usr{check_path}" return self.rootpath + check_path_usr_merge # Well, if it doesn't start with any of them, we need to check each and every directory. check_path_split = check_path.split("/", -1) prepend_path = "" for i, path in enumerate(check_path_split): prepend_path = f"{prepend_path}/{path}" if not path.endswith("/") else f"{prepend_path}/{path}/" check_path_split[i] = f"{prepend_path}" check_path = "" for i, path in enumerate(check_path_split): if os.path.isdir(path) and i != (len(check_path_split) - 1): # Directory is not a symlink, we can just ignore and move on. pass elif os.path.isdir(path) and i == (len(check_path_split) - 1): # The last thing to access was a directory. We can safely return the full path now. return path elif not os.path.isdir(path) and i != (len(check_path_split) - 1): # This was not the last thing we needed to access, so we assume it's a symlink. # One problem is that we can't be sure, so we make sure it is a symlink. with open(self.fs(path, False)) as f: is_symlink = f.read() f.close() if is_symlink.startswith("symlnk"): # This is a symlink! # Symlinks always contain the full literal path that they need to access, so we can # take that and do the same trick to split it and add the next things to it. # raise NotImplementedError() is_symlink_split = is_symlink.split(" ", 1) symlink_dest = is_symlink_split[1] symlink_dest = f"{symlink_dest}/{path}" return symlink_dest else: # This is either not a symlink or an improperly configured one. self.msg("jailmgr.fs()", "reached non-symlink file not at end of list", False, "ERROR") self.msg("jailmgr.fs()", "What was assumed to be a directory isn't a" " directory nor a symlink! This might be because of a " "typo or misconfigured symlink. The directory in question: " f"{path}", True, "WARNING") return 2 return self.rootpath + check_path if check_path.startswith("."): check_path = check_path.lstrip(".") return os.getcwd() + check_path if "vfs" in os.getcwd() else 2 elif self.rootpath in check_path: self.msg(f"jailmgr.fs()", "Cannot parse rootpath, expected vfspath", log_level="ERROR") return 3 else: # Path is not in the jailed fs, so we say it doesn't exist. self.msg(f"jailmgr.fs()", "File/directory doesn't exist in vfspath", log_level="ERROR") return 2 else: rootpath = os.getcwd() + "/vfs" self.msg("jailmgr.fs()", message=rootpath, log_level="INFO") return rootpath def kver(self): """ Returns the kernel version """ return "0.2.1-next1" def netsock(self, ip, port, mode, msg): """ An easy interface to network sockets, built right into the jailmanager Args: ip: The IP of the server to access. port: The port to access the server on mode: Either UDP, TCP or PKG (HTTP) msg: The message to send the server Returns: Whatever the server returns. """ raise NotImplementedError("TODO: Netsock will be implemented once 0.3.0 comes around!")