其他分享
首页 > 其他分享> > OSSIM-agent源代码分析(四)

OSSIM-agent源代码分析(四)

作者:互联网

2021SC@SDUSC
OSSIM-agent源代码分析(四)

OSSIM-agent-config.py下

简述

OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式(standardized way)有序的发送给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行标准化处理,这样Server就可以依一种统一的方式来处理这些信息,并且也简化了Server的处理过程

agent的配置文件是config.py,作为大量数据的标准化处理位置,配置文件的对各种数据、文件的处理方式的

相关方法

在plugin.cfg文件中显示,各种常量的设置

    _NEEDED_CONFIG_ENTRIES = {
        'config': ['type', 'source', 'enable']
    }
    _EXIT_IF_MALFORMED_CONFIG = False
    TRANSLATION_SECTION = 'translation'
    TRANSLATION_FUNCTION = 'translate'
    TRANSLATION_DEFAULT = '_DEFAULT_'
    SECTIONS_NOT_RULES = ["config", "info", TRANSLATION_SECTION]

    CONCAT_FUNCTION = 'CONCAT'
    _MAP_REPLACE_TRANSLATIONS = 4
    _MAP_REPLACE_USER_FUNCTIONS = 8
    _MAP_REPLACE_CUSTOM_USER_FUNCTIONS = 16
    _MAP_REPLACE_CONCAT = 32      #00100000
    _MAP_REPLACE_TRANSLATE2 = 64  #1000000

检查所选部分是否为可以的翻译部分

    __regexIsTranslationSection = re.compile("translation-\S+", re.UNICODE)
    __regex_check_for_translate2_function = re.compile("\{translate2\((?P<variable>\$[^\)]+),(?P<translate_section>\$[^\)]+)\)\}", re.UNICODE)

设置配置、翻译等具体的规则

    def rules(self):
        rules = {}
        for section in sorted(self.sections()):
            regexp = self.get(section,"regexp")
            if self.get("config","source") == "log" and (regexp is None or regexp == ""):
                continue
            if Plugin.__regexIsTranslationSection.match(section.lower()) is not None:
                if section.lower() not in Plugin.SECTIONS_NOT_RULES:
                    logger.info("Loading new translation section... %s" % section.lower())
                    Plugin.SECTIONS_NOT_RULES.append(section.lower())
            if section.lower() not in Plugin.SECTIONS_NOT_RULES:
                rules[section] = self.hitems(section,True)
        return rules

通过正则表达式和位置变换进行变量替换

    def _replace_array_variables(self, value, groups):
        for i in range(2):
            search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
            rvalue = value
            if search != []:
                for string in search:
                    var = string[2:-1]
                    var_position = 0
                    try:
                        var_position= int(var)
                        if var_position < len(groups):
                            rvalue = rvalue.replace(string, str(groups[var_position]))
                    except ValueError,e:
                        rvalue = value
        return rvalue

替换变量、用户功能和自定义用户功能

 def get_replace_array_value(self, value, groups):  
        rvalue = ""
        rvalue = self._replace_array_variables(value, groups)
        if rvalue==value:
            rvalue = self._replace_user_array_functions(value, groups)
        if rvalue == value:
            rvalue = self._replace_custom_user_function_array(value,groups)
        return rvalue

使用位置参数替换自定义用户函数,并且检查所有变量是否有替换,以及调用函数在ParserUtil.py

    def _replace_custom_user_function_array(self,value,groups):
        search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)
                for var in vars:
                    var_pos =0
                    try:
                        var_pos = int(var)
                    except TypeError:
                        logger.warning("Can not replace '%s'" % var)
                        return value
                    if var_pos >= len(groups):
                        logger.warning("Can not replace '%s' " % var)
                        return value
                func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id"))
                if func_name != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(Plugin, func_name):
                    args = ",".join(["groups[%s]"%s  for s in vars])
                    try:
                        cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args)
                        exec cmd
                    except TypeError, e:
                        logger.error(e)
                else:
                    logger.warning( "Function '%s' is not implemented" % (func))
                    value = value.replace(string_matched, str(groups[var]))
        return value

替换位置函数,检查所有变量是否有替换,然后调用ParserUtil.py函数、getattr等函数执行替换功能

def _replace_user_array_functions(self, value, groups):

        if value is None:
            return None
        search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)

                for var in vars:

                    if len(groups) - 1 < int(var):
                        return value
                if func != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(ParserUtil, func):
                   
                    f = getattr(ParserUtil, func)

                    args = ""

                    for v in vars:
   
                        args += "str(groups[%s])," % (str(v))

                    try:
                        exec "value = value.replace(string_matched," + \
                            "str(f(" + args + ")))"
                    except TypeError, e:
                        logger.error(e)

                else:
                    
                    logger.debug("Tranlation fucntion...")
                    for v in vars:
                     .
                        if int(v) > (len(groups) - 1):
                            logger.debug("Error var:%d, is greatter than groups size. (%d)" % (int(v), len(groups)))
                        else:
                            var_value = groups[int(v)] 
                            if self.has_section(Plugin.TRANSLATION_SECTION):
                                if self.has_option(Plugin.TRANSLATION_SECTION, var_value):
                                    value = self.get(Plugin.TRANSLATION_SECTION, var_value)

        return value


通过正则表达式,查找配置参数中的_CFG(section,option)值,并将其替换为全局配置文件中的值

   def replace_config(self, conf):

        for section in sorted(self.sections()):
            for option in self.options(section):
                regexp = self.get(section, option)
                search = re.findall("(\\\\_CFG\(([\w-]+),([\w-]+)\))", regexp, re.UNICODE)
                if search != []:
                    for string in search:
                        (all, arg1, arg2) = string
                        if conf.has_option(arg1, arg2):
                            value = conf.get(arg1, arg2)
                            regexp = regexp.replace(all, value)
                            self.set(section, option, regexp)

总规则下,regexp条目中替换处理每一个反斜线

 def replace_aliases(self, aliases)
        for rule in self.rules().iterkeys():

            regexp = self.get(rule, 'regexp')
            search = re.findall("\\\\\w\w+", regexp, re.UNICODE)

            if search != []:
                for string in search:

                    repl = string[1:]
                    if aliases.has_option("regexp", repl):
                        value = aliases.get("regexp", repl)
                        regexp = regexp.replace(string, value)
                        self.set(rule, "regexp", regexp)

从get_replace_value()调用,通过正则替换变量

    def _replace_variables(self, value, groups, rounds=2):

        for i in range(rounds):
            search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
            if search != []:
                for string in search:
                    var = string[2:-1]
                    if groups.has_key(var):
                        value = value.replace(string, str(groups[var]))

        return value

确定替换变量是否成功,如果失败,则跳过get_replace_value()

    def _replace_variables_assess(self, value):
        ret = 0
        for i in range(2):
            search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
            if search != []:
                ret = i
        return ret

通过搜索字符串方式,替换指定插件及其对应插件

    def _replace_translations(self, value, groups):
        regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})"
        search = re.findall(regexp, value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, var) = string
                if groups.has_key(var):
                    if self.has_section(Plugin.TRANSLATION_SECTION):
                        if self.has_option(Plugin.TRANSLATION_SECTION,
                                           groups[var]):
                            value = self.get(Plugin.TRANSLATION_SECTION,
                                             groups[var])
                        else:
                            logger.warning("Can not translate '%s' value" % \
                                (groups[var]))
                            if self.has_option(Plugin.TRANSLATION_SECTION,
                                               Plugin.TRANSLATION_DEFAULT):
                                value = self.get(Plugin.TRANSLATION_SECTION,
                                                 Plugin.TRANSLATION_DEFAULT)
                            else:
                                value = groups[var]
                    else:
                        logger.warning("There is no translation section")
                        value = groups[var]

        return value

同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()

    def _replace_translations_assess(self, value):
        regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})"
        search = re.findall(regexp, value, re.UNICODE)
        if search != []:
            return self._MAP_REPLACE_TRANSLATIONS

        return 0

函数被指定为{f($v)},并且从get_replace_value()调用,执行替换功能

    def _replace_user_functions(self, value, groups):

        search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)
                for var in vars:

                    if not groups.has_key(var):
                        logger.warning("Can not replace '%s'" % (value))
                        return value

                if func != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(ParserUtil, func):

                  
                    f = getattr(ParserUtil, func)
                   
                    args = ""
                    for i in (range(len(vars))):
                        args += "groups[vars[%s]]," % (str(i))
 
                    try:
                        cmd = "value = value.replace(string_matched," + \
                              "str(f(" + args + ")))"
                        exec cmd
                    except TypeError, e:
                        logger.error(e)

                else:
                    logger.warning(
                        "Function '%s' is not implemented" % (func))

                    value = value.replace(string_matched, \
                                          str(groups[var]))

        return value

同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()

    def _replace_user_functions_assess(self, value):
        search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
        if search != []:
            return self._MAP_REPLACE_USER_FUNCTIONS
        
        return 0

检查所有变量是否有替换,如果没有一直替换到全部替换为止

  def _replace_custom_user_functions(self,value,groups):
        search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)

                for var in vars:

                    if not groups.has_key(var):
                        logger.warning("Can not replace '%s'" % (var))
                        return value
                func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id"))
                if func_name != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(Plugin, func_name):
                   
                    args = ""
                    for i in (range(len(vars))):
                        args += "groups[vars[%s]]," % (str(i))

                    try:
                        cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args)
                        exec cmd
                    except TypeError, e:
                        logger.error(e)

                else:
                    logger.warning(
                        "Function '%s' is not implemented" % (func))

                    value = value.replace(string_matched, \
                                          str(groups[var]))

        return value

替换数组变量连接的值

def __replaceConcatFunction(self,value,groups):

        concat =""
        m =  re.match("\$CONCAT\((?P<params>.*)\)",value)
        if m:
            mdict = m.groupdict()
            if mdict.has_key('params'):
                paramlist = mdict['params'].split(',')
                for param in paramlist:
                    if param.startswith('$'):#assume variable
                        if groups.has_key(param[1:]):
                            concat+=groups[param[1:]]
                        else:
                            concat+=param
                    else:
                        concat+=param
        return concat

检查是否有必要进行连接函数替换

 def __checkReplaceConcatFunction(self,value):
        ret = 0
        if re.match("\$CONCAT\((.*)\)",value):
            ret = self._MAP_REPLACE_CONCAT
        return ret

替换传递过来的对象中给定的值

 def __replace_translate2_function(self,value,groups):
        replaced_value =""
        m =  self.__regex_check_for_translate2_function.match(value)
        if m:
            mdict = m.groupdict()
            if 'variable' in mdict and 'translate_section' in mdict:
                variable = mdict['variable'].replace('$','')
                translate_section = mdict['translate_section'].replace('$','')
                if variable in groups:
                    if self.has_section(translate_section):
                        if self.has_option(translate_section,groups[variable]):
                            replaced_value = self.get(translate_section, groups[variable])
                            return replaced_value
                        else:
                            logger.warning("Translate2 variable(%s) not found on the translate section %s" % (groups[variable],translate_section))
                    else:
                        logger.warning("Translate2 translation_section(%s) not found" % translate_section)
        replaced_value = value

检查是否需要换translate2

   def __check_for_translate2_function(self,value):
        """Check whether we need to make a translate2 replacement or not. 
        @param value: the string in the rule
        """
        ret = 0
        if Plugin.__regex_check_for_translate2_function.match(value) is not None:
            ret = self._MAP_REPLACE_TRANSLATE2
        return ret

替换concat函数、自定义用户函数、用户函数、翻译、变量等

   def get_replace_value(self, value, groups, replace=15):
        if replace > 0:
            if replace & self._MAP_REPLACE_TRANSLATE2:
                value = self.__replace_translate2_function(value, groups)

            if replace & 3:
                value = self._replace_variables(value, groups, (replace & 3))

            if replace & 4:
                value = self._replace_translations(value, groups)

            if replace & 8:
                value = self._replace_user_functions(value, groups)

            if replace & self._MAP_REPLACE_CUSTOM_USER_FUNCTIONS:
                value = self._replace_custom_user_functions(value,groups)

            if replace & self._MAP_REPLACE_CONCAT:
                value = self.__replaceConcatFunction(value,groups)

        return value

类CommandLineOptions,用于配置各种数据

class CommandLineOptions:

    def __init__(self):

        self.__options = None

        parser = OptionParser(
            usage="%prog [-v] [-q] [-d] [-f] [-g] [-c config_file]",
            version="OSSIM (Open Source Security Information Management) " + \
                      "- Agent ")

        parser.add_option("-v", "--verbose", dest="verbose",
                          action="count",
                          help="verbose mode, makes lot of noise")
        parser.add_option("-d", "--daemon", dest="daemon", action="store_true",
                          help="Run agent in daemon mode")
        parser.add_option("-f", "--force", dest="force", action="store_true",
                          help="Force startup overriding pidfile")
        parser.add_option("-s", "--stats", dest="stats", type='choice', choices=['all', 'clients', 'plugins'], default=None,
                          help="Get stats about the agent")
        parser.add_option("-c", "--config", dest="config_file", action="store",
                          help="read config from FILE", metavar="FILE")
        (self.__options, args) = parser.parse_args()

        if len(args) > 1:
            parser.error("incorrect number of arguments")

        if self.__options.verbose and self.__options.daemon:
            parser.error("incompatible options -v -d")


    def get_options(self):
        return self.__options

进行数组等变量创建

def split_variables(string):
    return re.findall("(?:\$([^,\s]+))+", string)


def split_sids(string, separator=','):

    list = list_tmp = []

    list = string.split(separator)

    for sid in list:
        a = sid.split('-')
        if len(a) == 2:
            list.remove(sid)
            for i in range(int(a[0]), int(a[1]) + 1):
                list_tmp.append(str(i))

    list.extend(list_tmp)
    return list

标签:string,self,value,replace,agent,groups,var,源代码,OSSIM
来源: https://blog.csdn.net/aunt_y/article/details/121057735