I have applied for the Google Summer of
Code,
2014 edition, a couple of months earlier and I have been accepted!
Therefore, for the next months I will be working on the
OWASP
- OWTF project.
This first post describes the OWTF tool and the project I will have to
implement for August.
Then it explains the few contributions I have been working on since the past
three weeks.
It is the first post from a monthly series that will show my progress on the Automated Ranking System I am implementing.
The OWASP - OWTF project
The OWASP Offensive Web Testing Framework (OWTF) aims to provide an efficient approach to combine out-of-the-box thinking that only an human can provide with the automated work from a machine. It gathers a complete set of plugins and merges their results into one complete interactive report. The pentester has then the possibility to add notes, to change details, and to add media like screenshots, in order to have a final report.
In other words, OWTF is a tool that will automatically run a lot of plugins
against targets chosen by an user.
Then, it will gather the results into one report that the user can modify and
even use it as the draft version for a test report.
In my opinion, this is something really cool and it is awesome to be part of its development.
The automated ranking system
The current version of OWTF provides complete interactive reports of web and network tests. But it does not provide automated vulnerability rankings yet.
My project is to enhance OWTF in order to provide an automated ranking for each plugin. This will allow the human to focus attention on the most likely weak areas of a web application or network first, which will be valuable to efficiently use the available time in a penetration test.
Long story short, my project will give to OWTF the ability to rank the plugins
according to what they have discovered.
The current flowchart looks like following:
The project will requires a lot of work mostly directed on how to retrieve
rankings from tools.
Getting in touch with the contributors from the ThreadFix
project gave me a good overview of
what to do.
Currently, the idea is to create a standalone library that will parse
the outputs of tools that already provide automated rankings.
It will have to support most of the existing tools and most of their versions.
Being standalone will allow an easier test framework implementation too.
First contribution - Easy way for new attributes
In order to set a default ranking for each plugin, I needed an easy way to declare a new attribute for each plugin.
Before - Some magic shell
The OWTF retrieves the DESCRIPTION
attribute using a shell command. The
DESCRIPTION
variable is declared for each plugin and quickly describes its
aim.
class PluginDB(object): # [. . .] def LoadFromFileSystem(self): # This commands finds all the plugins and gets their descriptions # in one go. PluginFinderCommand = "for i in $(find " + \ self.Core.Config.FrameworkConfigGet('PLUGINS_DIR') + \ " -name '*.py'); " \ "do echo \"$i#$(grep ^DESCRIPTION $i|sed 's/ = /=/'|" \ "cut -f2 -d=)\"; done | sort" session = self.PluginDBSession() for line in self.Core.Shell.shell_exec(PluginFinderCommand).split("\n"): if not line: continue # Skip blank lines. Plugin = line.strip().replace( self.Core.Config.FrameworkConfigGet('PLUGINS_DIR'), '') # Remove plugin directory part of the path. PluginFile, PluginDescrip = Plugin.split('#') # Get rid of surrounding quotes. PluginDescrip = PluginDescrip[1:-1] PluginChunks = PluginFile.split('/') # i.e. all modules have a group. i.e. for web plugins: types # are -> passive, semi_passive, active, grep. if (len(PluginChunks) == 3): PluginGroup, PluginType, PluginFile = PluginChunks PluginName, PluginCode = PluginFile.split('@') PluginCode = PluginCode.split('.')[0] # Get rid of the ".py" session.merge( models.Plugin( key=PluginType + '@' + PluginCode, group=PluginGroup, type=PluginType, title=PluginName.title().replace('_', ' '), name=PluginName, code=PluginCode, file=PluginFile, descrip=PluginDescrip ) ) session.commit()
From line 6 to 10 we can read the magic shell command that retrieves the
DESCRIPTION
attribute.
For each plugin file, greps the DESCRIPTION
line and only displays the string
between two '"'.
It is easy to understand how the system is hard to maintain as soon as one
wants to add a new attribute.
If one wants to add a boolean attribute for instance, the output from the shell
command will then have to be passed to a python function like
strtobool
.
The main drawback of this function is that it is not modular. In order to add a new attribute for a plugin, a lot of lines have to be modified:
- The shell command
- The cast of its output if needed
- Saving the casted output into the databse.
After - Some magic python
For my project, I will do have to add attributes for each plugin. At least one
that will be the default ranking value (among High, Medium, Low and
Informational).
Therefore I wanted to change the LoadFromFileSystem
into a more pythonesc
way. For instance, using the imp
module.
The idea was to dynamically load each plugin and instantly retrieve its attributes. When discussing with the other contributors of the OWTF, it came up that it will be a good thing to have a dictionary for all the future extra attributes that might appear.
Coupling the dynamic loading with some json encode, the extra attributes will be encoded into one single json string and saved into the database. That way no extra database schema modification will be needed.
Here is the final version of the implementation of the attributes dictionary:
class PluginDB(object): # [. . .] def LoadFromFileSystem(self): # Retrieve the list of the plugins (sorted) from the directory given by # 'PLUGIN_DIR'. plugins = [] for root, _, files in os.walk(self.Core.Config.FrameworkConfigGet('PLUGINS_DIR')): plugins.extend([ os.path.join(root, filename) for filename in files if filename.endswith('py')]) plugins = sorted(plugins) # Retrieve the information of the plugin. for plugin_path in plugins: # Only keep the relative path to the plugin plugin = plugin_path.replace( self.Core.Config.FrameworkConfigGet('PLUGINS_DIR'), '') # Retrieve the group, the type and the file of the plugin. chunks = plugin.split(os.path.sep) if len(chunks) == 3: group, type, file = chunks # Retrieve the internal name and code of the plugin. name, code = os.path.splitext(file)[0].split('@') # Load the plugin as a module. filename, pathname, desc = imp.find_module( os.path.splitext(os.path.basename(plugin_path))[0], [os.path.dirname(plugin_path)]) plugin_module = imp.load_module( os.path.splitext(file)[0], filename, pathname, desc) # Try te retrieve the `attr` dictionary from the module and convert # it to json in order to save it into the database. attr = None try: attr = json.dumps(plugin_module.ATTR) except AttributeError: # The plugin didn't define an attr dict. pass # Save the plugin into the database. session.merge( models.Plugin( key=type + '@' + code, group=group, type=type, title=name.title().replace('_', ' '), name=name, code=code, file=file, descrip=plugin_module.DESCRIPTION, attr=attr ) ) session.commit()
Now on, each plugin can be simply extended by:
- Defining a new key/value in its
ATTR
dictionary
I went from a 3 steps to a 1 step modification in order to add a default ranking value for a plugin :)
Second contribution - Classful plugin system
After spending some times on the OWTF's plugin system, an idea grew in my mind: implement a classful plugin system.
The current version of the project has a plugin_helper
file that declares
every useful functions that a plugin might need.
I thought it could be better to split that file into a set of classes where
each of them will be specialized for one type of plugin. For instance an
ActivePlugin
, a PassivePlugin
, etc.
This classful system aims to be more modular than the current one and nicer in its coding style.
I am still working on it but I already have a draft version on the
ActivePlugin
class implementation. Please keep in mind that the code below is
a work in progress.
Abstract plugin
The current hierarchy is that every plugin classes will inherit from an abstract one that provides the default methods.
class AbstractPlugin(object): """Abstract plugin declaring basics methods.""" RESOURCES = None def __init__(self, core, plugin_info, resources=None, *args, **kwargs): """Self-explanatory.""" # A plugin has a reference to the Core object. self.core = core # Keep track of the abort self.framework_abort = False self.plugin_abort = False # Keep track of the elapsed time self.elapsed_time = None # A plugin contains several information like a group, a type, etc. self.info = None if AbstractPlugin.is_valid_info(plugin_info): self.info = plugin_info else: # The information are not valid, throw something # TODO: Create a custom error maybe? raise ValueError( "The information of the plugin did not fulfill " "the requirements.") # Plugin might have a resource which might contains the command that # will be run for instance. self.resources = resources or self.RESOURCES if not self.resources is None: if isinstance(self.resources, basestring): self.resources = self.core.DB.Resource.GetResources( self.resources) else: # Assuming that resources is a list. self.resources = self.core.DB.Resource.GetResourceList( self.resources) # The ouput of a plugin is saved into its attribute `output` and its # type is saved into `type`. self.output = None self.type = None def run(self): """Callback function that actually runs the plugin.""" raise NotImplementedError('A plugin MUST implement the run method.') @staticmethod def is_valid_info(info): """Check that the information of a plugin is correct.""" # Check if a group is specified and if it is a valid one. if (not 'group' in info or ('group' in info and not info['group'] in TEST_GROUPS)): return False # Check if a type is specified and if it is a valid one. if (not 'type' in info or ('type' in info and not info['type'] in VALID_TYPES)): return False # TODO: Check the other info. # Everything's fine about the information return True def _init_output_dir(self): """Returns the output path of the plugin.""" # Retrieve the relative path of the plugin output. base_path = '' if self.info['group'] in [WEB_GROUP, NET_GROUP]: base_path = self.core.DB.Target.GetPath('PARTIAL_URL_OUTPUT_PATH') elif self.info['group'] == AUX_GROUP: base_path = self.core.Config.Get('AUX_OUTPUT_PATH') output_dir = os.path.join( base_path, os.path.join( clean_filename(self.info['title']), self.info['type']) ) # FULL output path for plugins to use self.core.DB.Target.SetPath( 'PLUGIN_OUTPUT_DIR', os.path.join(os.getcwd(), output_dir)) # Force the creation of the directory if it does not exist yet. self.core.CreateMissingDirs(output_dir) self.output_dir = output_dir def dump(self, type='type', output='output'): """Return the result of a plugin. Generate a dictionary from the attributes `type` and `output` and returns a list of it. """ return [dict({type: self.type, output: self.output})]
As you can see, there are a lot of TODOs in the code since it is a WIP.
At the moment, this abstract class provides three basic methods:
is_valid_info
is a static method that checks if the information of a plugin is correct._init_output_dir
is an internal method that will create the output plugin directory if it is missing.dump
is a method that acts like the python__repr__
function since it translates the output of the plugin into something understandable for OWTF.
Active plugin
At the current time, I am still working on the relationships between each layer
and each class.
So far, a plugin may or may not need to run a shell command in order to run a
specific tool. Mostly all the active plugins need to run a command meanwhile
passive plugins do not.
Therefore, I have implemented an extra class layer called
AbstractRunCommandPlugin
that only implements a new method, allowing a plugin
to run an actual shell command according to its resources.
class AbstractRunCommandPlugin(AbstractPlugin): """Abstract plugin that runs a shell command.""" def __init__(self, *args, **kwargs): """Self-explanatory.""" AbstractPlugin.__init__(self, *args, **kwargs) self.cmd_modified = None self.raw_output = None def run_command(self, cmd): """Run the shell command of the plugin.""" if not hasattr(self, 'output_dir'): self._init_output_dir() # Keep track of the elapsed time. self.core.Timer.StartTimer('run_command') self.cmd_modified = self.core.Shell.GetModifiedShellCommand( cmd, self.output_dir) # Run the shell command. try: self.raw_output = self.core.Shell.shell_exec_monitor( self.cmd_modified) except PluginAbortException as partial_output: self.raw_output = str(partial_output.parameter) self.plugin_abort = True except FrameworkAbortException as partial_output: self.raw_output = str(partial_output) self.framework_abort = True # Save the elapsed time. self.elapsed_time = self.core.Timer.GetElapsedTimeAsStr('run_command') log('Time=' + self.elapsed_time)
Then I inherit the ActivePlugin
class from the AbstractRunCommandPlugin
that implements the default run()
method. This method will then be called by
OWTF.
class ActivePlugin(AbstractRunCommandPlugin): """Active plugin.""" def __init__(self, core, plugin_info, resources=None, cmd_intro='Test command', output_intro='Output', prev_output=None, *args, **kwargs): """Self-explanatory.""" AbstractRunCommandPlugin.__init__( self, core, plugin_info, resources, *args, **kwargs) self.cmd_intro = cmd_intro self.output_intro = output_intro self.prev_output = prev_output self._init_output_dir() def run(self): """Callback function which is run by OWTF. Default ActivePlugin behaviour. This function can be overrided by the user when declaring an ActivePlugin. That way, the user can take into account specific usages. """ return self.command_run() def command_run(self): """Run the plugin command and format its output.""" output_list = [] for name, cmd in self.resources: self.run_command(cmd) self.type = 'CommandDump' self.output = { 'Name': None, # TODO: Write GetCommandOutputFileNameAndExtension 'CommandIntro': self.cmd_intro, 'ModifiedCommand': self.cmd_modified, 'RelativeFilePath': self.core.PluginHandler.DumpOuputFile( name, self.raw_output, self.info, RelativePath=True), 'OutputIntro': self.output_intro, 'TimeStr': self.elapsed_time} plugin_output = self.dump() # This command returns URLs for processing if name == self.core.Config.FrameworkConfigGet('EXTRACT_URLS_RESERVED_RESOURCE_NAME'): plugin_output = self.log_urls() if self.plugin_abort: raise PluginAbortException(self.prev_output + plugin_output) if self.framework_abort: raise FrameworkAbortException(self.prev_output + plugin_output) output_list += plugin_output return (output_list) # TODO: Write the doc string. def log_urls(self): # Keep track of the elapsed time. self.core.Timer.StartTimer('log_urls') urls = self.raw_output.strip().split('\n') self.core.DB.URL.ImportUrls(urls) nb_found = 0 visit_urls = False # TODO: Whether or not active testing will depend on the user profile # ;). Have cool ideas for profile names if True: visit_urls = True nb_found = sum([ transaction.Found for transaction in self.core.Requester.GetTransactions( True, self.core.DB.URL.GetURLsToVisit()) ]) self.elapsed_time = self.core.Timer.GetElapsedTimeAsStr('log_urls') log('Spider/URL scraper time=' + self.elapsed_time) self.type = 'URLsFromStr' self.output = { 'TimerStr': self.elapsed_time, 'VisitUrls': visit_urls, 'URLList': urls, 'NumFound': nb_found} return (self.dump())
With the current version of the classful plugin system, I went from a basic active plugin like:
DESCRIPTION = "Active Vulnerability Scanning without credentials via Arachni" def run(Core, PluginInfo): return Core.PluginHelper.CommandDump( 'Test Command', 'Output', Core.DB.Resource.GetResources('Arachni_Unauth'), PluginInfo, [])
To a classful version like:
from framework.plugin.plugins import ActivePlugin class ArachniUnauthPlugin(ActivePlugin): """Active Vulnerability Scanning without credentials via Arachni.""" RESOURCES = 'Arachni_Unauth'
From my point of view, this new system has several pros:
- Declaring a basic active plugin requires less code than before.
- The default behaviour can easily be modified by overriding the
run
method. - The
DESCRIPTION
attribute becomes the docstring of the class, which looks better in my opinion. - The source code is more modular than the previous one.
I still have a lot of work to do about this system since I only consider the
active plugins for now. I will have to think about the ATTR
dictionary shown
in the previous part too.
Conclusion
During these 3 first weeks of the GSoC 2014 working on the OWASP - OWTF project, I have managed to improve a few things about the code:
- Allow the user to add a new attribute for a plugin without requiring modification of the source code.
- A classful plugin system that eases the creation of new plugins for the user.
Of course, the second point is not complete yet but I am confident to finish this system soon.
See you next month for the second monthly GSoC post on my work!