'''
Copyright 2017, Fujitsu Network Communications, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
"""
Module that contains methods for testcase results related operations
!!! Important!!!
DO NOT import any modules from warrior/Framework package that uses
warrior/Framework/Utils/print_Utils.py at module level into this module
as it will lead to cyclic imports.
"""
import xml.etree.ElementTree as ET
import inspect
import re
from Framework.Utils.print_Utils import print_info, print_debug, print_warning,\
print_error, print_exception, print_sub, print_notype, print_without_logging
#import Framework.Utils.file_Utils as file_Utils
#import Framework.Utils.config_Utils as config_Utils
#import Framework.Utils.xml_Utils as xml_Utils
class TestcaseUtils(object):
"""testcase utils class"""
def __init__(self):
"""
set default value
"""
#self.resultfile = config_Utils.resultfile
self.root = None
self.current_pointer = None
self.gkeyword = {}
self.gkeywordloop = 0
self.gstep = {}
self.gsteploop = 0
self.gsubstep = {}
self.gsubsteploop = 0
self.grequirement = {}
self.grequirementloop = 0
self.gsubkey = {}
self.gsubkeyloop = 0
def file_utils(self):
"""
"""
import Framework.Utils.file_Utils as file_utils
return file_utils
def xml_utils(self):
"""
"""
import Framework.Utils.xml_Utils as xml_utils
return xml_utils
def print_output(self):
""" Prints the dump of the xml object to the file specified.
This function can be used for debugging purpose and
its called at the end of the functional calls.
:Arguments:
resultfile = Result File
:Returns:
None
"""
try:
import Framework.Utils.config_Utils as config_Utils
resultfile = config_Utils.resultfile
tree = ET.ElementTree(self.root)
tree.write(resultfile)
except UnicodeDecodeError as e:
print_exception(e)
except Exception as err:
print_exception(err)
@staticmethod
def p_open(fileobject):
"""Open a file"""
fobj = open(fileobject, 'a')
return fobj
@staticmethod
def p_close(fileobject):
"""Close a file"""
fileobject.close()
# define the testcase self.root element
def p_testcase(self):
"""Creates a self.root tag in the testcase result xml file"""
self.root = ET.Element("Testcase")
self.current_pointer = self.root
self.print_output()
def p_keyword(self, keyword_txt, driver_txt):
"""Creates a keyword tag as direct child to the <Testcase> tag """
self.p_testcase()
self.gkeywordloop = self.gkeywordloop+1
print_info("\n********************* Keyword: %s *********************\n" % keyword_txt)
self.gkeyword[self.gkeywordloop] = ET.SubElement(self.root, "Keyword")
self.current_pointer = self.gkeyword[self.gkeywordloop]
name = ET.SubElement(self.gkeyword[self.gkeywordloop], "Name")
name.text = keyword_txt
self.p_driver(driver_txt)
self.print_output()
def p_subkeyword(self, keyword_txt):
""" Creates a Keyword tag as the child node to the <SubStep> tag """
self.gsubkeyloop = self.gsubkeyloop+1
print_info("\n***************Sub-Keyword: %s "
"***************\n" % keyword_txt)
self.gsubkey[self.gsubkeyloop] = ET.SubElement(\
self.gstep[self.gsteploop], "Keyword")
self.current_pointer = self.gsubkey[self.gsubkeyloop]
self.gsubkey[self.gsubkeyloop].text = keyword_txt
self.print_output()
def p_driver(self, driver_txt):
"""Create a Driver tag"""
ldriver = ET.SubElement(self.current_pointer, "Driver")
ldriver.text = driver_txt
self.print_output()
def p_step(self, step_txt=""):
"""Create a step tag"""
self.gsteploop = self.gsteploop+1
self.gstep[self.gsteploop] = ET.SubElement(self.gkeyword[self.gkeywordloop], "Step")
self.current_pointer = self.gstep[self.gsteploop]
self.gstep[self.gsteploop].text = step_txt
self.print_output()
def update_kw_resultfile(self, kw_resultfile):
""" adds step_num attribute to the step tag in
test case result xml file"""
rf = ET.SubElement(self.gkeyword[self.gkeywordloop], "Resultfile")
rf.text = str(kw_resultfile)
def update_step_num(self, step_num):
""" adds step_num attribute to the step tag in
test case result xml file"""
self.gkeyword[self.gkeywordloop].set('step_num', str(step_num))
self.gstep[self.gsteploop].set('step_num', str(step_num))
def update_arguments(self, args):
"""Update the arguments supplied to the keyword """
arguements = ET.SubElement(self.gkeyword[self.gkeywordloop], "Arguments")
for arg in args:
ET.SubElement(arguements, "argument", name=str(arg), value=str(args[arg]))
def p_substep(self, substep_txt=""):
"""Create a substep tag"""
self.gsubsteploop = self.gsubsteploop+1
self.gsubstep[self.gsubsteploop] = ET.SubElement(self.gstep[\
self.gsteploop], "SubStep")
self.current_pointer = self.gsubstep[self.gsubsteploop]
self.gsubstep[self.gsubsteploop].text = substep_txt
print_info("\n<< Substep >>")
print_info("Keyword Description: {0}".format(substep_txt))
self.print_output()
def get_write_locn(self, level):
"""Get the write locn, either kw/step/substep/None
None = current level """
write_locn = self.current_pointer
try:
if level == "KW":
write_locn = self.gkeyword[self.gkeywordloop]
elif level == "STEP":
write_locn = self.gstep[self.gsteploop]
elif level == "SUBSTEP":
write_locn = self.gsubstep[self.gsteploop]
elif level == "NONE":
write_locn = self.current_pointer
except KeyError:
write_locn = self.current_pointer
return write_locn
def p_note_level(self, txt, print_type="info", level=None, ptc=True):
"""Create Note at the provided level"""
write_locn = self.get_write_locn(str(level).upper())
print_util_types = ["-D-", "", "-I-", "-E-", "-W-", "-N-",
"\033[1;31m-E-\033[0m", "\033[1;33m-W-\033[0m"]
p_type = {'INFO': print_info,
'DEBUG': print_debug,
'WARN': print_warning,
'WARNING': print_warning,
'ERROR': print_error,
'EXCEPTION': print_exception,
'SUB': print_sub,
'NOTYPE': print_notype,
'NOLOG': print_without_logging}.get(str(print_type).upper(),
print_info)
txt = self.rem_nonprintable_ctrl_chars(str(txt))
if write_locn is None:
write_locn = self.current_pointer
if ptc and print_type not in print_util_types:
p_type(txt)
# self.current_pointer may be None,which is not a intended behavior
# If print_type is nolog or -N-,the message will be logged in terminal
# but not in result file
if write_locn is not None and (print_type != '-N-' and print_type != 'nolog'):
doc = ET.SubElement(write_locn, "Note")
doc.text = txt
self.print_output()
# The below elif is bypasses the else below. As we may want to
# print items (banners) before we have a handle to write
elif print_type == "notype":
pass
elif print_type not in print_util_types and print_type != 'nolog':
print_error("Unable to write to location in result file, the "
"message is logged in terminal but not in result file")
def rem_nonprintable_ctrl_chars(self, txt):
"""Remove non_printable ascii control characters """
#Removes the ascii escape chars
try:
txt = re.sub(r'[^\x20-\x7E|\x09-\x0A]','', txt)
# remove non-ascii characters
txt = repr(txt).decode('unicode_escape').encode('ascii','ignore')[1:-1]
except Exception as exception:
print_exception(exception)
return txt
def p_note(self, txt, print_type="info"):
"""Adds a note to the testcase xml result file under the current tag
:Arguments:
1. txt = (string) a text description of the step
:Returns:
None
"""
self.p_note_level(txt, print_type)
def p_custom_tag(self, name, txt):
"""Adds a note to the testcase xml result file under the current tag
:Arguments:
1. txt = (string) a text description of the step
:Returns:
None
"""
doc = ET.SubElement(self.current_pointer, str(name))
doc.text = str(txt)
self.print_output()
def p_status(self, status, level):
"""Report the status"""
if level.upper() == "SUBSTEP":
levelobj = self.gsubstep[self.gsubsteploop]
elif level.upper() == "STEP":
levelobj = self.gstep[self.gsteploop]
elif level.upper() == "KEYWORD":
levelobj = self.gkeyword[self.gkeywordloop]
elif level.upper() == "SUBKEYWORD":
levelobj = self.gsubkey[self.gsubkeyloop]
status_tag = ET.SubElement(levelobj, "%sStatus" % level)
status_tag.text = status
self.print_output()
def p_ran(self, level, text=""):
"""Report a pass """
print_info("{0} STATUS:RAN".format(text))
#print_info("PASS: %s\n" % text)
self.p_status("RAN", level)
def p_pass(self, level, text=""):
"""Report a pass """
print_info("{0} STATUS:PASS".format(text))
#print_info("PASS: %s\n" % text)
self.p_status("PASS", level)
def p_fail(self, level, text=""):
"""Report a fail """
print_info("{0} STATUS:FAIL".format(text))
#print_info("FAIL: %s\n" % text)
self.p_status("FAIL", level)
def p_exception(self, level, text=""):
"""Report a exception """
print_info("{0} STATUS:EXCEPTION".format(text))
#print_info("EXCEPTION: %s\n" % text)
self.p_status("EXCEPTION", level)
def p_error(self, level, text=""):
"""Report a error """
print_info("{0} STATUS:ERROR".format(text))
#print_error("ERROR: %s\n" % text)
self.p_status("ERROR", level)
def p_warn(self, level, text=""):
"""Report a warning """
print_info("{0} STATUS:WARNING".format(text))
#print_warning("WARNING: %s\n" % text)
self.p_status("WARNING", level)
def p_skip(self, level, text=""):
"""Report a skip """
print_info("{0} STATUS:SKIPPED".format(text))
#print_info("SKIPPED: %s\n" % text)
self.p_status("SKIPPED", level)
def p_report_requirements(self, requirement_id):
"""Report the requirement-id to the xml result file """
self.grequirementloop = self.grequirementloop + 1
print_info("Requirement: %s " % requirement_id)
self.grequirement[self.grequirementloop] = ET.SubElement(self.current_pointer,
"Requirement")
self.grequirement[self.grequirementloop].text = requirement_id
self.print_output()
@staticmethod
def get_wdesc_string(function_object):
""" Gets the WDesc string from the function object
"""
source = inspect.getsource(function_object)
search_wdesc_var = function_object.__name__
#code_lines = source.split('WDesc')
code_lines = re.split("(?i)WDESC", source)
if len(code_lines) > 1:
funcbody = code_lines[1]
search_var = re.search('(\")([^\"]*)(\")', funcbody, re.DOTALL|re.MULTILINE)
if search_var is not None:
search_wdesc_var = search_var.group()
elif search_var is None:
search_wdesc_var = function_object.__name__
return search_wdesc_var
@staticmethod
def p_convert_logical(text):
""" map NO=false, YES=True and return the bool to the calling function"""
return {'NO': False, 'YES': True}.get(text.upper())
@staticmethod
def convert_logic(text):
"""Converts
True -> PASS
False -> FAIL
ERROR -> ERROR
EXCEPTION -> ERROR
"""
result = {True: 'PASS', False: 'FAIL',
'ERROR': 'ERROR', 'EXCEPTION': 'ERROR', 'RAN': 'RAN'}.get(text)
if result is None:
print_error("junk or no value received, expecting TRUE/FALSE/ERROR/EXCEPTION")
result = 'ERROR'
return result
def report_status(self, status, text="", level='Keyword'):
"""
Reports the status to the testcase xml result file base on the received status
On receiving a True reports keyword status as Passed
On receiving a False reports keyword status as Failed
On receiving a Skip reports keyword status as Skipped
On receiving a Exception reports keyword status as Exception
On receiving a Error reports Keyword status as Error
On receiving a RAN reports Keyword status as RAN
:Arguments:
1. status = (bool) True or False
2. text = (string) any useful description
3. level = (string) only supported value currently is Keyword
:Returns:
None
"""
status = {'TRUE': self.p_pass, 'FALSE': self.p_fail,
'SKIP': self.p_skip, 'EXCEPTION': self.p_exception,
'ERROR': self.p_error, 'RAN': self.p_ran}.get(str(status).upper())
if status is None:
print_error("unexpected or no value received, expecting TRUE/FALSE/SKIP")
self.p_error(level, text)
else:
status(level, text)
def report_warning(self, status, text="", level='subStep'):
"""
Reports the status to the testcase xml result file based on the received status
On receiving a True reports keyword status as Passed
On receiving a False reports keyword status as Warning
On receiving a Skip reports keyword status as Skipped
On receiving a Exception reports keyword status as Exception
On receiving a Error reports Keyword status as Error
:Arguments:
1. status = (bool) True or False
2. text = (string) any useful description
3. level = (string) only supported value currently is Keyword
:Returns:
None
"""
from WarriorCore.Classes.war_cli_class import WarriorCliClass
if WarriorCliClass.mock:
if str(status).upper() == 'TRUE' or str(status).upper() == 'FALSE':
status = 'RAN'
status = {'TRUE': self.p_pass, 'FALSE': self.p_warn,
'SKIP': self.p_skip, 'EXCEPTION': self.p_exception,
'ERROR': self.p_error, 'RAN': self.p_ran}.get(str(status).upper())
if status is None:
print_error("unexpected or no value received, expecting TRUE/FALSE/SKIP")
self.p_error(level, text)
else:
status(level, text)
def report_substep_status(self, status):
""" Same as reportWarning created to have a more meaningful name for the function...
Reports the status of a substep to the testcase xml result file base don the received status
On receiving a True reports substep as Passed
On receiving a False raises a warning
On receiving a Skip reports skips the substep execution
:Arguments:
1. status = (bool) True or False
2. text = (string) any useful description
3. level = (string) only supported value currently is subStep
:Returns:
None
"""
print_info("\n<< Substep status >>")
self.report_warning(status)
def report_keyword_status(self, status, kw_name=''):
""" Same as reportStatus: changed function name to be more meaningful
Reports the status of a Keyword to the testcase xml result file base don the received status
On receiving a True reports keyword as Passed
On receiving a False reports keyword as Failure
On receiving a Skip reports skips the keyword execution
:Arguments:
1. status = (bool) True or False
2. text = (string) any useful description
3. level = (string) only supported value currently is Keyword
:Returns:
None
"""
kw_text = "KEYWORD:{0} ".format(kw_name)
self.report_status(status, text=kw_text)
def report_subkeyword_status(self, status, text, level='SubKeyword'):
"""Reports the status of a sub-keyword to the testcase xml result file
based on the received status
On receiving a True reports keyword as Passed
On receiving a False reports keyword as Failure
On receiving a Skip reports skips the keyword execution
:Arguments:
1. status = (bool) True or False
2. text = (string) any useful description
3. level = (string) default would be 'Subkeyword'
:Returns:
None
"""
self.report_status(status, text, level)
def p_test_result(self, text, resultfile):
"""Report test result"""
result = self.convert_logic(text)
self.root = self.xml_utils().getRoot(resultfile)
tcstatus = ET.SubElement(self.root, "TCstatus")
tcstatus.text = result
tree = ET.ElementTree(self.root)
tree.write(resultfile)
def append_result_files(self, dst_resultfile, kw_resultfile_list, dst_root='Testcase', childtag='Keyword'):
"""Append kw/system result files into a testcase result file"""
try:
finstring = ''
for kw_file in kw_resultfile_list:
if kw_file is not None and kw_file is not False:
tree = self.xml_utils().get_tree_from_file(kw_file)
self.root = tree.getroot()
for child in self.root:
if child.tag == childtag:
finstring = finstring+self.xml_utils().convert_element_to_string(child)
tc_string = ' '
if self.file_utils().fileExists(dst_resultfile):
tc_tree = ET.parse(dst_resultfile)
tc_root = tc_tree.getroot()
for tc_child in tc_root:
tc_string = tc_string+self.xml_utils().convert_element_to_string(tc_child)
finalresult = '\n'.join(['<{0}>'.format(dst_root), tc_string + finstring,
'</{0}>'.format(dst_root)])
with open(dst_resultfile, 'w') as resultfile:
resultfile.write(finalresult)
resultfile.flush()
resultfile.close()
except Exception, err:
print_info('unexpected error: {0}'.format(str(err)))
@staticmethod
def compute_status_using_impact(input_status_list, input_impact_list, status=True):
"""Computes the status from the list of input status and input impact """
value = True
result = []
for i in range(0, len(input_status_list)):
input_status = input_status_list[i]
input_impact = input_impact_list[i]
if str(input_impact).upper() == 'IMPACT' and input_status != None:
if str(input_status).upper() == 'ERROR' or str(input_status).upper() == 'EXCEPTION':
value = 'ERROR'
elif str(input_status).upper() == 'RAN':
value = 'RAN'
elif input_status is False:
value = False
elif input_status is True:
value = True
result.append(value)
if 'ERROR' in result:
status = 'ERROR'
elif False in result:
status = False
elif 'RAN' in result:
status = 'RAN'
else:
status = True
return status
@staticmethod
def compute_status_without_impact(input_status_list, status=True):
"""Computes the system status from the list of step status"""
status = True
for i in range(0, len(input_status_list)):
input_status = input_status_list[i]
if input_status is not None:
if str(input_status).upper() == 'ERROR' or str(input_status).upper() == 'EXCEPTION':
#input_status_list[i] = False
status = 'ERROR'
break
elif str(input_status).upper() == 'RAN':
status = 'RAN'
elif input_status == False:
status = False
break
elif input_status is True:
status = True
return status
def compute_system_resultfile(self, kw_resultfile_list, resultsdir, system_name):
"""Generates a system resultfile from the list of keyword result files """
system_results_dir = self.file_utils().createDir(resultsdir, 'System_Results')
system_resultfile = self.file_utils().getCustomLogFile('system', system_results_dir,
system_name, '.xml')
self.append_result_files(system_resultfile, kw_resultfile_list, dst_root='System')
return system_resultfile
def add_defects_to_resultfile(self, resultfile, defect_id):
"""Adds defects if any to the testcase result file """
self.root = self.xml_utils().getRoot(resultfile)
defects = ET.SubElement(self.root, "Defect")
defects.text = defect_id
tree = ET.ElementTree(self.root)
tree.write(resultfile)
def get_impact_from_xmlfile(self, element):
"""Gets the impact value of a step/testcase/suite
from the testcase.xml/testsuite.xml/project.xml file """
impact = self.xml_utils().get_text_from_direct_child(element, 'impact')
if impact is None or impact is False:
impact = 'IMPACT'
elif impact is not None and impact is not False:
impact = str(impact).strip()
supported_values = ['impact', 'noimpact']
if impact.lower() not in supported_values:
print_warning("unsupported value '{0}' provided for impact,"\
"supported values are '{1}',"\
"case-insensitive".format(impact, supported_values))
print_info("Hence using default value for impact which is 'impact'")
impact = 'IMPACT'
return impact
def get_context_from_xmlfile(self, element):
"""Gets the context value of a step/testcase/suite
from the testcase.xml/testsuite.xml/project.xml file """
context = self.xml_utils().get_text_from_direct_child(element, 'context')
if context is None or context is False:
context = 'POSITIVE'
elif context is not None and context is not False:
context = str(context).strip()
supported_values = ['positive', 'negative']
if context.lower() not in supported_values:
print_warning("unsupported value '{0}' provided for context,"\
"supported values are '{1}',"\
"case-insensitive".format(context, supported_values))
print_info("Hence using default value for context which is 'positive'")
context = 'POSITIVE'
return context
def get_description_from_xmlfile(self, element):
"""Gets the description value of a step/testcase/suite
from the testcase.xml/testsuite.xml/project.xml file """
description = self.xml_utils().get_text_from_direct_child(element, 'Description')
if not description:
description = '*** Description not provided by user ***'
else:
description = str(description).strip()
return description
def get_defonerror_fromxml_file(self, filepath):
"""Gets the default on error value of a step/testcase/suite
from the testcase.xml/testsuite.xml/project.xml file """
def_on_error_action = self.xml_utils().getChildAttributebyParentTag(filepath, 'Details',
'default_onError', 'action')
if def_on_error_action is None or def_on_error_action is False:
def_on_error_action = 'NEXT'
elif def_on_error_action is not None and def_on_error_action is not False:
supported_values = ['next', 'goto', 'abort', 'abort_as_error']
if not str(def_on_error_action).lower() in supported_values:
print_warning("unsupported option '{0}' provided for default_onError"\
"action, supported values are {1}".format(def_on_error_action,
supported_values))
print_info("Hence using default value for default_onError action which is 'next'")
def_on_error_action = 'NEXT'
return def_on_error_action
def get_requirement_id_list(self, testcase_filepath):
"""gets the list of requirements for the testcase """
tc_root = self.xml_utils().getRoot(testcase_filepath)
requirements = tc_root.find('Requirements')
req_id_list = []
if requirements is None or requirements is False:
print_warning('Testcase does not have any requirements')
else:
requirement_list = requirements.findall('Requirement')
if requirement_list is None:
print_warning('Testcase does not have any requirement')
else:
for req in requirement_list:
req_id = req.text
if req_id is not None and req_id is not False:
req_id = req_id.strip()
req_id_list.append(req_id)
if len(req_id_list) == 0:
req_id_list = None
return req_id_list
def get_steps_list(self, testcase_filepath):
"""Takes the location of any Testcase xml file as input
Returns a list of all the step elements present in the Testcase
:Arguments:
1. testcase_filepath = full path of the Testcase xml file
"""
step_list = []
tc_root = self.xml_utils().getRoot(testcase_filepath)
steps = tc_root.find('Steps')
if steps is None:
print_info('Testcase has no commands: tag <Steps> not found in the input file ')
else:
step_list = steps.findall('step')
return step_list