GSoC 2014 - Working for OWASP on OWTF

Published on Saturday, 10 May 2014 in Gsoc 2014, Python ; tagged with gsoc, 2014, project, owasp, owtf, security, pentest, python ; text version

I have applied for the Google Summer of Code, 2014 edition, a couple of months earlier and I have been accepted!
Therefore, for the next months I will be working on the OWASP - OWTF project.

This first post describes the OWTF tool and the project I will have to implement for August.
Then it explains the few contributions I have been working on since the past three weeks.

It is the first post from a monthly series that will show my progress on the Automated Ranking System I am implementing.

The OWASP - OWTF project

The OWASP Offensive Web Testing Framework (OWTF) aims to provide an efficient approach to combine out-of-the-box thinking that only an human can provide with the automated work from a machine. It gathers a complete set of plugins and merges their results into one complete interactive report. The pentester has then the possibility to add notes, to change details, and to add media like screenshots, in order to have a final report.

In other words, OWTF is a tool that will automatically run a lot of plugins against targets chosen by an user.
Then, it will gather the results into one report that the user can modify and even use it as the draft version for a test report.

In my opinion, this is something really cool and it is awesome to be part of its development.

The automated ranking system

The current version of OWTF provides complete interactive reports of web and network tests. But it does not provide automated vulnerability rankings yet.
My project is to enhance OWTF in order to provide an automated ranking for each plugin. This will allow the human to focus attention on the most likely weak areas of a web application or network first, which will be valuable to efficiently use the available time in a penetration test.

Long story short, my project will give to OWTF the ability to rank the plugins according to what they have discovered.
The current flowchart looks like following:

OWTF Automated ranking
system

The project will requires a lot of work mostly directed on how to retrieve rankings from tools.
Getting in touch with the contributors from the ThreadFix project gave me a good overview of what to do.

Currently, the idea is to create a standalone library that will parse the outputs of tools that already provide automated rankings.
It will have to support most of the existing tools and most of their versions. Being standalone will allow an easier test framework implementation too.

First contribution - Easy way for new attributes

In order to set a default ranking for each plugin, I needed an easy way to declare a new attribute for each plugin.

Before - Some magic shell

The OWTF retrieves the DESCRIPTION attribute using a shell command. The DESCRIPTION variable is declared for each plugin and quickly describes its aim.

class PluginDB(object):
    # [. . .]
    def LoadFromFileSystem(self):
        # This commands finds all the plugins and gets their descriptions
        # in one go.
        PluginFinderCommand = "for i in $(find " + \
            self.Core.Config.FrameworkConfigGet('PLUGINS_DIR') + \
            " -name '*.py'); " \
            "do echo \"$i#$(grep ^DESCRIPTION $i|sed 's/ = /=/'|" \
            "cut -f2 -d=)\"; done | sort"
        session = self.PluginDBSession()
        for line in self.Core.Shell.shell_exec(PluginFinderCommand).split("\n"):
            if not line:
                continue  # Skip blank lines.
            Plugin = line.strip().replace(
                self.Core.Config.FrameworkConfigGet('PLUGINS_DIR'),
                '')  # Remove plugin directory part of the path.
            PluginFile, PluginDescrip = Plugin.split('#')
            # Get rid of surrounding quotes.
            PluginDescrip = PluginDescrip[1:-1]
            PluginChunks = PluginFile.split('/')
            # i.e. all modules have a group. i.e. for web plugins: types
            # are -> passive, semi_passive, active, grep.
            if (len(PluginChunks) == 3):
                PluginGroup, PluginType, PluginFile = PluginChunks
            PluginName, PluginCode = PluginFile.split('@')
            PluginCode = PluginCode.split('.')[0] # Get rid of the ".py"
            session.merge(
                models.Plugin(
                    key=PluginType + '@' + PluginCode,
                    group=PluginGroup,
                    type=PluginType,
                    title=PluginName.title().replace('_', ' '),
                    name=PluginName,
                    code=PluginCode,
                    file=PluginFile,
                    descrip=PluginDescrip
                )
            )
        session.commit()

From line 6 to 10 we can read the magic shell command that retrieves the DESCRIPTION attribute.
For each plugin file, greps the DESCRIPTION line and only displays the string between two '"'.

It is easy to understand how the system is hard to maintain as soon as one wants to add a new attribute.
If one wants to add a boolean attribute for instance, the output from the shell command will then have to be passed to a python function like strtobool.

The main drawback of this function is that it is not modular. In order to add a new attribute for a plugin, a lot of lines have to be modified:

  1. The shell command
  2. The cast of its output if needed
  3. Saving the casted output into the databse.

After - Some magic python

For my project, I will do have to add attributes for each plugin. At least one that will be the default ranking value (among High, Medium, Low and Informational).
Therefore I wanted to change the LoadFromFileSystem into a more pythonesc way. For instance, using the imp module.

The idea was to dynamically load each plugin and instantly retrieve its attributes. When discussing with the other contributors of the OWTF, it came up that it will be a good thing to have a dictionary for all the future extra attributes that might appear.

Coupling the dynamic loading with some json encode, the extra attributes will be encoded into one single json string and saved into the database. That way no extra database schema modification will be needed.

Here is the final version of the implementation of the attributes dictionary:

class PluginDB(object):
    # [. . .]
    def LoadFromFileSystem(self):
    # Retrieve the list of the plugins (sorted) from the directory given by
    # 'PLUGIN_DIR'.
    plugins = []
    for root, _, files in os.walk(self.Core.Config.FrameworkConfigGet('PLUGINS_DIR')):
        plugins.extend([
            os.path.join(root, filename) for filename in files
            if filename.endswith('py')])
    plugins = sorted(plugins)
    # Retrieve the information of the plugin.
    for plugin_path in plugins:
        # Only keep the relative path to the plugin
        plugin = plugin_path.replace(
            self.Core.Config.FrameworkConfigGet('PLUGINS_DIR'),
            '')
        # Retrieve the group, the type and the file of the plugin.
        chunks = plugin.split(os.path.sep)
        if len(chunks) == 3:
            group, type, file = chunks
        # Retrieve the internal name and code of the plugin.
        name, code = os.path.splitext(file)[0].split('@')
        # Load the plugin as a module.
        filename, pathname, desc = imp.find_module(
            os.path.splitext(os.path.basename(plugin_path))[0],
            [os.path.dirname(plugin_path)])
        plugin_module = imp.load_module(
            os.path.splitext(file)[0],
            filename,
            pathname,
            desc)
        # Try te retrieve the `attr` dictionary from the module and convert
        # it to json in order to save it into the database.
        attr = None
        try:
            attr = json.dumps(plugin_module.ATTR)
        except AttributeError: # The plugin didn't define an attr dict.
            pass
        # Save the plugin into the database.
        session.merge(
            models.Plugin(
                key=type + '@' + code,
                group=group,
                type=type,
                title=name.title().replace('_', ' '),
                name=name,
                code=code,
                file=file,
                descrip=plugin_module.DESCRIPTION,
                attr=attr
            )
        )
    session.commit()

Now on, each plugin can be simply extended by:

  1. Defining a new key/value in its ATTR dictionary

I went from a 3 steps to a 1 step modification in order to add a default ranking value for a plugin :)

Second contribution - Classful plugin system

After spending some times on the OWTF's plugin system, an idea grew in my mind: implement a classful plugin system.

The current version of the project has a plugin_helper file that declares every useful functions that a plugin might need.
I thought it could be better to split that file into a set of classes where each of them will be specialized for one type of plugin. For instance an ActivePlugin, a PassivePlugin, etc.

This classful system aims to be more modular than the current one and nicer in its coding style.

I am still working on it but I already have a draft version on the ActivePlugin class implementation. Please keep in mind that the code below is a work in progress.

Abstract plugin

The current hierarchy is that every plugin classes will inherit from an abstract one that provides the default methods.

class AbstractPlugin(object):
    """Abstract plugin declaring basics methods."""

    RESOURCES = None

    def __init__(self, core, plugin_info, resources=None, *args, **kwargs):
        """Self-explanatory."""
        # A plugin has a reference to the Core object.
        self.core = core
        # Keep track of the abort
        self.framework_abort = False
        self.plugin_abort = False
        # Keep track of the elapsed time
        self.elapsed_time = None
        # A plugin contains several information like a group, a type, etc.
        self.info = None
        if AbstractPlugin.is_valid_info(plugin_info):
            self.info = plugin_info
        else: # The information are not valid, throw something
            # TODO: Create a custom error maybe?
            raise ValueError(
                "The information of the plugin did not fulfill "
                "the requirements.")
        # Plugin might have a resource which might contains the command that
        # will be run for instance.
        self.resources = resources or self.RESOURCES
        if not self.resources is None:
            if isinstance(self.resources, basestring):
                self.resources = self.core.DB.Resource.GetResources(
                    self.resources)
            else: # Assuming that resources is a list.
                self.resources = self.core.DB.Resource.GetResourceList(
                    self.resources)
        # The ouput of a plugin is saved into its attribute `output` and its
        # type is saved into `type`.
        self.output = None
        self.type = None

    def run(self):
        """Callback function that actually runs the plugin."""
        raise NotImplementedError('A plugin MUST implement the run method.')

    @staticmethod
    def is_valid_info(info):
        """Check that the information of a plugin is correct."""
        # Check if a group is specified and if it is a valid one.
        if (not 'group' in info or
                ('group' in info and not info['group'] in TEST_GROUPS)):
            return False
        # Check if a type is specified and if it is a valid one.
        if (not 'type' in info or
                ('type' in info and not info['type'] in VALID_TYPES)):
            return False
        # TODO: Check the other info.
        # Everything's fine about the information
        return True

    def _init_output_dir(self):
        """Returns the output path of the plugin."""
        # Retrieve the relative path of the plugin output.
        base_path = ''
        if self.info['group'] in [WEB_GROUP, NET_GROUP]:
            base_path = self.core.DB.Target.GetPath('PARTIAL_URL_OUTPUT_PATH')
        elif self.info['group'] == AUX_GROUP:
            base_path = self.core.Config.Get('AUX_OUTPUT_PATH')
        output_dir = os.path.join(
            base_path,
            os.path.join(
                clean_filename(self.info['title']), self.info['type'])
            )
        # FULL output path for plugins to use
        self.core.DB.Target.SetPath(
            'PLUGIN_OUTPUT_DIR',
            os.path.join(os.getcwd(), output_dir))
        # Force the creation of the directory if it does not exist yet.
        self.core.CreateMissingDirs(output_dir)
        self.output_dir = output_dir

    def dump(self, type='type', output='output'):
        """Return the result of a plugin.

        Generate a dictionary from the attributes `type` and `output` and
        returns a list of it.

        """
        return [dict({type: self.type, output: self.output})]

As you can see, there are a lot of TODOs in the code since it is a WIP.

At the moment, this abstract class provides three basic methods:

  1. is_valid_info is a static method that checks if the information of a plugin is correct.
  2. _init_output_dir is an internal method that will create the output plugin directory if it is missing.
  3. dump is a method that acts like the python __repr__ function since it translates the output of the plugin into something understandable for OWTF.

Active plugin

At the current time, I am still working on the relationships between each layer and each class.
So far, a plugin may or may not need to run a shell command in order to run a specific tool. Mostly all the active plugins need to run a command meanwhile passive plugins do not.

Therefore, I have implemented an extra class layer called AbstractRunCommandPlugin that only implements a new method, allowing a plugin to run an actual shell command according to its resources.

class AbstractRunCommandPlugin(AbstractPlugin):
    """Abstract plugin that runs a shell command."""

    def __init__(self, *args, **kwargs):
        """Self-explanatory."""
        AbstractPlugin.__init__(self, *args, **kwargs)
        self.cmd_modified = None
        self.raw_output = None

    def run_command(self, cmd):
        """Run the shell command of the plugin."""
        if not hasattr(self, 'output_dir'):
            self._init_output_dir()
        # Keep track of the elapsed time.
        self.core.Timer.StartTimer('run_command')
        self.cmd_modified = self.core.Shell.GetModifiedShellCommand(
            cmd,
            self.output_dir)
        # Run the shell command.
        try:
            self.raw_output = self.core.Shell.shell_exec_monitor(
                self.cmd_modified)
        except PluginAbortException as partial_output:
            self.raw_output = str(partial_output.parameter)
            self.plugin_abort = True
        except FrameworkAbortException as partial_output:
            self.raw_output = str(partial_output)
            self.framework_abort = True
        # Save the elapsed time.
        self.elapsed_time = self.core.Timer.GetElapsedTimeAsStr('run_command')
        log('Time=' + self.elapsed_time)

Then I inherit the ActivePlugin class from the AbstractRunCommandPlugin that implements the default run() method. This method will then be called by OWTF.

class ActivePlugin(AbstractRunCommandPlugin):
    """Active plugin."""

    def __init__(self,
                 core,
                 plugin_info,
                 resources=None,
                 cmd_intro='Test command',
                 output_intro='Output',
                 prev_output=None,
                 *args, **kwargs):
        """Self-explanatory."""
        AbstractRunCommandPlugin.__init__(
            self,
            core,
            plugin_info,
            resources,
            *args, **kwargs)
        self.cmd_intro = cmd_intro
        self.output_intro = output_intro
        self.prev_output = prev_output
        self._init_output_dir()

    def run(self):
        """Callback function which is run by OWTF.

        Default ActivePlugin behaviour.
        This function can be overrided by the user when declaring an
        ActivePlugin. That way, the user can take into account specific usages.

        """
        return self.command_run()

    def command_run(self):
        """Run the plugin command and format its output."""
        output_list = []
        for name, cmd in self.resources:
            self.run_command(cmd)
            self.type = 'CommandDump'
            self.output = {
                'Name': None, # TODO: Write GetCommandOutputFileNameAndExtension
                'CommandIntro': self.cmd_intro,
                'ModifiedCommand': self.cmd_modified,
                'RelativeFilePath': self.core.PluginHandler.DumpOuputFile(
                    name,
                    self.raw_output,
                    self.info,
                    RelativePath=True),
                'OutputIntro': self.output_intro,
                'TimeStr': self.elapsed_time}
            plugin_output = self.dump()

            # This command returns URLs for processing
            if name == self.core.Config.FrameworkConfigGet('EXTRACT_URLS_RESERVED_RESOURCE_NAME'):
                plugin_output = self.log_urls()

            if self.plugin_abort:
                raise PluginAbortException(self.prev_output + plugin_output)
            if self.framework_abort:
                raise FrameworkAbortException(self.prev_output + plugin_output)

            output_list += plugin_output
        return (output_list)

    # TODO: Write the doc string.
    def log_urls(self):
        # Keep track of the elapsed time.
        self.core.Timer.StartTimer('log_urls')
        urls = self.raw_output.strip().split('\n')
        self.core.DB.URL.ImportUrls(urls)
        nb_found = 0
        visit_urls = False
        # TODO: Whether or not active testing will depend on the user profile
        # ;). Have cool ideas for profile names
        if True:
            visit_urls = True
            nb_found = sum([
                transaction.Found
                for transaction in self.core.Requester.GetTransactions(
                    True, self.core.DB.URL.GetURLsToVisit())
                ])
        self.elapsed_time = self.core.Timer.GetElapsedTimeAsStr('log_urls')
        log('Spider/URL scraper time=' + self.elapsed_time)
        self.type = 'URLsFromStr'
        self.output = {
            'TimerStr': self.elapsed_time,
            'VisitUrls': visit_urls,
            'URLList': urls,
            'NumFound': nb_found}
        return (self.dump())

With the current version of the classful plugin system, I went from a basic active plugin like:

DESCRIPTION = "Active Vulnerability Scanning without credentials via Arachni"


def run(Core, PluginInfo):
    return Core.PluginHelper.CommandDump(
        'Test Command',
        'Output',
        Core.DB.Resource.GetResources('Arachni_Unauth'),
        PluginInfo,
        [])

To a classful version like:

from framework.plugin.plugins import ActivePlugin


class ArachniUnauthPlugin(ActivePlugin):
    """Active Vulnerability Scanning without credentials via Arachni."""

    RESOURCES = 'Arachni_Unauth'

From my point of view, this new system has several pros:

  1. Declaring a basic active plugin requires less code than before.
  2. The default behaviour can easily be modified by overriding the run method.
  3. The DESCRIPTION attribute becomes the docstring of the class, which looks better in my opinion.
  4. The source code is more modular than the previous one.

I still have a lot of work to do about this system since I only consider the active plugins for now. I will have to think about the ATTR dictionary shown in the previous part too.

Conclusion

During these 3 first weeks of the GSoC 2014 working on the OWASP - OWTF project, I have managed to improve a few things about the code:

  1. Allow the user to add a new attribute for a plugin without requiring modification of the source code.
  2. A classful plugin system that eases the creation of new plugins for the user.

Of course, the second point is not complete yet but I am confident to finish this system soon.

See you next month for the second monthly GSoC post on my work!


contactdepier.re License WTFPL2