title: GSoC 2014 - Working for OWASP on OWTF author: depierre published: 2014-05-10 categories: Gsoc 2014, Python keywords: gsoc, 2014, project, owasp, owtf, security, pentest, python I have applied for the [Google Summer of Code](https://www.google-melange.com/gsoc/document/show/gsoc_program/google/gsoc2014/about_page), 2014 edition, a couple of months earlier and I have been accepted! Therefore, for the next months I will be working on the [OWASP](https://www.owasp.org/index.php/Main_Page) - [OWTF](https://www.owasp.org/index.php/OWASP_OWTF) project. This first post describes the OWTF tool and the project I will have to implement for August. Then it explains the few contributions I have been working on since the past three weeks. It is the first post from a monthly series that will show my progress on the [Automated Ranking System](https://www.owasp.org/index.php/GSoC2014_Ideas#OWASP_OWTF_-_Automated_Vulnerability_Severity_Rankings) I am implementing. # The OWASP - OWTF project > The OWASP Offensive Web Testing Framework (OWTF) aims to provide an efficient > approach to combine out-of-the-box thinking that only an human can provide > with the automated work from a machine. It gathers a complete set of plugins > and merges their results into one complete interactive report. The pentester > has then the possibility to add notes, to change details, and to add media > like screenshots, in order to have a final report. In other words, OWTF is a tool that will automatically run a lot of plugins against targets chosen by an user. Then, it will gather the results into one report that the user can modify and even use it as the draft version for a test report. In my opinion, this is something really cool and it is awesome to be part of its development. # The automated ranking system > The current version of OWTF provides complete interactive reports of web and > network tests. But it does not provide automated vulnerability rankings > yet. > My project is to enhance OWTF in order to provide an automated ranking for > each plugin. This will allow the human to focus attention on the most likely > weak areas of a web application or network first, which will be valuable to > efficiently use the available time in a penetration test. Long story short, my project will give to OWTF the ability to rank the plugins according to what they have discovered. The current flowchart looks like following: ![OWTF Automated ranking system](/static/images/gsoc2k14/owtf_ranking_system.png) The project will requires a lot of work mostly directed on how to retrieve rankings from tools. Getting in touch with the contributors from the [ThreadFix project](https://github.com/denimgroup/threadfix/) gave me a good overview of what to do. Currently, the idea is to create a *standalone* library that will parse the outputs of tools that already provide automated rankings. It will have to support most of the existing tools and most of their versions. Being standalone will allow an easier test framework implementation too. # First contribution - Easy way for new attributes In order to set a default ranking for each plugin, I needed an easy way to declare a new attribute for each plugin. ## Before - Some magic shell The OWTF retrieves the `DESCRIPTION` attribute using a shell command. The `DESCRIPTION` variable is declared for each plugin and quickly describes its aim. :::python class PluginDB(object): # [. . .] def LoadFromFileSystem(self): # This commands finds all the plugins and gets their descriptions # in one go. PluginFinderCommand = "for i in $(find " + \ self.Core.Config.FrameworkConfigGet('PLUGINS_DIR') + \ " -name '*.py'); " \ "do echo \"$i#$(grep ^DESCRIPTION $i|sed 's/ = /=/'|" \ "cut -f2 -d=)\"; done | sort" session = self.PluginDBSession() for line in self.Core.Shell.shell_exec(PluginFinderCommand).split("\n"): if not line: continue # Skip blank lines. Plugin = line.strip().replace( self.Core.Config.FrameworkConfigGet('PLUGINS_DIR'), '') # Remove plugin directory part of the path. PluginFile, PluginDescrip = Plugin.split('#') # Get rid of surrounding quotes. PluginDescrip = PluginDescrip[1:-1] PluginChunks = PluginFile.split('/') # i.e. all modules have a group. i.e. for web plugins: types # are -> passive, semi_passive, active, grep. if (len(PluginChunks) == 3): PluginGroup, PluginType, PluginFile = PluginChunks PluginName, PluginCode = PluginFile.split('@') PluginCode = PluginCode.split('.')[0] # Get rid of the ".py" session.merge( models.Plugin( key=PluginType + '@' + PluginCode, group=PluginGroup, type=PluginType, title=PluginName.title().replace('_', ' '), name=PluginName, code=PluginCode, file=PluginFile, descrip=PluginDescrip ) ) session.commit() From line 6 to 10 we can read **the magic shell command** that retrieves the `DESCRIPTION` attribute. For each plugin file, greps the `DESCRIPTION` line and only displays the string between two '"'. It is easy to understand how the system is hard to maintain as soon as one wants to add a new attribute. If one wants to add a boolean attribute for instance, the output from the shell command will then have to be passed to a python function like [`strtobool`](https://docs.python.org/2.7/distutils/apiref.html#distutils.util.strtobool). **The main drawback of this function is that it is not modular.** In order to add a new attribute for a plugin, a lot of lines have to be modified: 1. The shell command 2. The cast of its output if needed 3. Saving the casted output into the databse. ## After - Some magic python For my project, I will do have to add attributes for each plugin. At least one that will be the default ranking value (among *High*, *Medium*, *Low* and *Informational*). Therefore I wanted to change the `LoadFromFileSystem` into a more *pythonesc* way. For instance, using the [`imp` module](https://docs.python.org/2/library/imp.html). The idea was **to dynamically load each plugin and instantly retrieve its attributes**. When discussing with the other contributors of the OWTF, it came up that it will be **a good thing to have a dictionary for all the future extra attributes** that might appear. Coupling the dynamic loading with some json encode, the extra attributes will be encoded into one single json string and saved into the database. That way no extra database schema modification will be needed. Here is the final version of the implementation of the attributes dictionary: :::python class PluginDB(object): # [. . .] def LoadFromFileSystem(self): # Retrieve the list of the plugins (sorted) from the directory given by # 'PLUGIN_DIR'. plugins = [] for root, _, files in os.walk(self.Core.Config.FrameworkConfigGet('PLUGINS_DIR')): plugins.extend([ os.path.join(root, filename) for filename in files if filename.endswith('py')]) plugins = sorted(plugins) # Retrieve the information of the plugin. for plugin_path in plugins: # Only keep the relative path to the plugin plugin = plugin_path.replace( self.Core.Config.FrameworkConfigGet('PLUGINS_DIR'), '') # Retrieve the group, the type and the file of the plugin. chunks = plugin.split(os.path.sep) if len(chunks) == 3: group, type, file = chunks # Retrieve the internal name and code of the plugin. name, code = os.path.splitext(file)[0].split('@') # Load the plugin as a module. filename, pathname, desc = imp.find_module( os.path.splitext(os.path.basename(plugin_path))[0], [os.path.dirname(plugin_path)]) plugin_module = imp.load_module( os.path.splitext(file)[0], filename, pathname, desc) # Try te retrieve the `attr` dictionary from the module and convert # it to json in order to save it into the database. attr = None try: attr = json.dumps(plugin_module.ATTR) except AttributeError: # The plugin didn't define an attr dict. pass # Save the plugin into the database. session.merge( models.Plugin( key=type + '@' + code, group=group, type=type, title=name.title().replace('_', ' '), name=name, code=code, file=file, descrip=plugin_module.DESCRIPTION, attr=attr ) ) session.commit() Now on, each plugin can be simply extended by: 1. Defining a new key/value in its `ATTR` dictionary I went **from a 3 steps to a 1 step** modification in order to add a default ranking value for a plugin :) # Second contribution - Classful plugin system After spending some times on the OWTF's plugin system, an idea grew in my mind: implement a **classful plugin system**. The current version of the project has a `plugin_helper` file that declares every useful functions that a plugin might need. I thought it could be better to split that file into a set of classes where each of them will be specialized for one type of plugin. For instance an `ActivePlugin`, a `PassivePlugin`, etc. This classful system aims to be more modular than the current one and nicer in its coding style. I am still working on it but I already have a draft version on the `ActivePlugin` class implementation. Please keep in mind that the code below is a *work in progress*. ## Abstract plugin The current hierarchy is that every plugin classes will inherit from an abstract one that provides the default methods. :::python class AbstractPlugin(object): """Abstract plugin declaring basics methods.""" RESOURCES = None def __init__(self, core, plugin_info, resources=None, *args, **kwargs): """Self-explanatory.""" # A plugin has a reference to the Core object. self.core = core # Keep track of the abort self.framework_abort = False self.plugin_abort = False # Keep track of the elapsed time self.elapsed_time = None # A plugin contains several information like a group, a type, etc. self.info = None if AbstractPlugin.is_valid_info(plugin_info): self.info = plugin_info else: # The information are not valid, throw something # TODO: Create a custom error maybe? raise ValueError( "The information of the plugin did not fulfill " "the requirements.") # Plugin might have a resource which might contains the command that # will be run for instance. self.resources = resources or self.RESOURCES if not self.resources is None: if isinstance(self.resources, basestring): self.resources = self.core.DB.Resource.GetResources( self.resources) else: # Assuming that resources is a list. self.resources = self.core.DB.Resource.GetResourceList( self.resources) # The ouput of a plugin is saved into its attribute `output` and its # type is saved into `type`. self.output = None self.type = None def run(self): """Callback function that actually runs the plugin.""" raise NotImplementedError('A plugin MUST implement the run method.') @staticmethod def is_valid_info(info): """Check that the information of a plugin is correct.""" # Check if a group is specified and if it is a valid one. if (not 'group' in info or ('group' in info and not info['group'] in TEST_GROUPS)): return False # Check if a type is specified and if it is a valid one. if (not 'type' in info or ('type' in info and not info['type'] in VALID_TYPES)): return False # TODO: Check the other info. # Everything's fine about the information return True def _init_output_dir(self): """Returns the output path of the plugin.""" # Retrieve the relative path of the plugin output. base_path = '' if self.info['group'] in [WEB_GROUP, NET_GROUP]: base_path = self.core.DB.Target.GetPath('PARTIAL_URL_OUTPUT_PATH') elif self.info['group'] == AUX_GROUP: base_path = self.core.Config.Get('AUX_OUTPUT_PATH') output_dir = os.path.join( base_path, os.path.join( clean_filename(self.info['title']), self.info['type']) ) # FULL output path for plugins to use self.core.DB.Target.SetPath( 'PLUGIN_OUTPUT_DIR', os.path.join(os.getcwd(), output_dir)) # Force the creation of the directory if it does not exist yet. self.core.CreateMissingDirs(output_dir) self.output_dir = output_dir def dump(self, type='type', output='output'): """Return the result of a plugin. Generate a dictionary from the attributes `type` and `output` and returns a list of it. """ return [dict({type: self.type, output: self.output})] As you can see, there are a lot of *TODO*s in the code since it is a WIP. At the moment, this abstract class provides three basic methods: 1. `is_valid_info` is a static method that checks if the information of a plugin is correct. 2. `_init_output_dir` is an internal method that will create the output plugin directory if it is missing. 3. `dump` is a method that acts like the python [`__repr__`](https://docs.python.org/2/reference/datamodel.html#object.__repr__) function since it translates the output of the plugin into something understandable for OWTF. # Active plugin At the current time, I am still working on the relationships between each layer and each class. So far, a plugin may or may not need to run a shell command in order to run a specific tool. Mostly all the active plugins need to run a command meanwhile passive plugins do not. Therefore, I have implemented an extra class layer called `AbstractRunCommandPlugin` that only implements a new method, allowing a plugin to run an actual shell command according to its resources. :::python class AbstractRunCommandPlugin(AbstractPlugin): """Abstract plugin that runs a shell command.""" def __init__(self, *args, **kwargs): """Self-explanatory.""" AbstractPlugin.__init__(self, *args, **kwargs) self.cmd_modified = None self.raw_output = None def run_command(self, cmd): """Run the shell command of the plugin.""" if not hasattr(self, 'output_dir'): self._init_output_dir() # Keep track of the elapsed time. self.core.Timer.StartTimer('run_command') self.cmd_modified = self.core.Shell.GetModifiedShellCommand( cmd, self.output_dir) # Run the shell command. try: self.raw_output = self.core.Shell.shell_exec_monitor( self.cmd_modified) except PluginAbortException as partial_output: self.raw_output = str(partial_output.parameter) self.plugin_abort = True except FrameworkAbortException as partial_output: self.raw_output = str(partial_output) self.framework_abort = True # Save the elapsed time. self.elapsed_time = self.core.Timer.GetElapsedTimeAsStr('run_command') log('Time=' + self.elapsed_time) Then I inherit the `ActivePlugin` class from the `AbstractRunCommandPlugin` that implements the default `run()` method. This method will then be called by OWTF. :::python class ActivePlugin(AbstractRunCommandPlugin): """Active plugin.""" def __init__(self, core, plugin_info, resources=None, cmd_intro='Test command', output_intro='Output', prev_output=None, *args, **kwargs): """Self-explanatory.""" AbstractRunCommandPlugin.__init__( self, core, plugin_info, resources, *args, **kwargs) self.cmd_intro = cmd_intro self.output_intro = output_intro self.prev_output = prev_output self._init_output_dir() def run(self): """Callback function which is run by OWTF. Default ActivePlugin behaviour. This function can be overrided by the user when declaring an ActivePlugin. That way, the user can take into account specific usages. """ return self.command_run() def command_run(self): """Run the plugin command and format its output.""" output_list = [] for name, cmd in self.resources: self.run_command(cmd) self.type = 'CommandDump' self.output = { 'Name': None, # TODO: Write GetCommandOutputFileNameAndExtension 'CommandIntro': self.cmd_intro, 'ModifiedCommand': self.cmd_modified, 'RelativeFilePath': self.core.PluginHandler.DumpOuputFile( name, self.raw_output, self.info, RelativePath=True), 'OutputIntro': self.output_intro, 'TimeStr': self.elapsed_time} plugin_output = self.dump() # This command returns URLs for processing if name == self.core.Config.FrameworkConfigGet('EXTRACT_URLS_RESERVED_RESOURCE_NAME'): plugin_output = self.log_urls() if self.plugin_abort: raise PluginAbortException(self.prev_output + plugin_output) if self.framework_abort: raise FrameworkAbortException(self.prev_output + plugin_output) output_list += plugin_output return (output_list) # TODO: Write the doc string. def log_urls(self): # Keep track of the elapsed time. self.core.Timer.StartTimer('log_urls') urls = self.raw_output.strip().split('\n') self.core.DB.URL.ImportUrls(urls) nb_found = 0 visit_urls = False # TODO: Whether or not active testing will depend on the user profile # ;). Have cool ideas for profile names if True: visit_urls = True nb_found = sum([ transaction.Found for transaction in self.core.Requester.GetTransactions( True, self.core.DB.URL.GetURLsToVisit()) ]) self.elapsed_time = self.core.Timer.GetElapsedTimeAsStr('log_urls') log('Spider/URL scraper time=' + self.elapsed_time) self.type = 'URLsFromStr' self.output = { 'TimerStr': self.elapsed_time, 'VisitUrls': visit_urls, 'URLList': urls, 'NumFound': nb_found} return (self.dump()) With the current version of the classful plugin system, I went from a basic active plugin like: :::python DESCRIPTION = "Active Vulnerability Scanning without credentials via Arachni" def run(Core, PluginInfo): return Core.PluginHelper.CommandDump( 'Test Command', 'Output', Core.DB.Resource.GetResources('Arachni_Unauth'), PluginInfo, []) To a classful version like: :::python from framework.plugin.plugins import ActivePlugin class ArachniUnauthPlugin(ActivePlugin): """Active Vulnerability Scanning without credentials via Arachni.""" RESOURCES = 'Arachni_Unauth' From my point of view, this new system has several pros: 1. Declaring a basic active plugin requires less code than before. 2. The default behaviour can easily be modified by overriding the `run` method. 3. The `DESCRIPTION` attribute becomes the *docstring* of the class, which looks better in my opinion. 4. The source code is more modular than the previous one. I still have a lot of work to do about this system since I only consider the active plugins for now. I will have to think about the `ATTR` dictionary shown in the previous part too. # Conclusion During these 3 first weeks of the GSoC 2014 working on the OWASP - OWTF project, I have managed to improve a few things about the code: 1. Allow the user to add a new attribute for a plugin without requiring modification of the source code. 2. A classful plugin system that eases the creation of new plugins for the user. Of course, the second point is not complete yet but I am confident to finish this system soon. See you **next month for the second monthly GSoC post** on my work!