'''
Copyright 2017, Fujitsu Network Communications, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import os
import time
from Framework import Utils
from Framework.Utils import data_Utils, file_Utils, datetime_utils
from Framework.Utils.testcase_Utils import pNote
[docs]class CIregressionActions(object):
"""
This class contains keywords that are used in warrior regression test
"""
def __init__(self):
"""
constructor
"""
self.resultfile = Utils.config_Utils.resultfile
self.datafile = Utils.config_Utils.datafile
self.logsdir = Utils.config_Utils.logsdir
self.filename = Utils.config_Utils.filename
self.logfile = Utils.config_Utils.logfile
[docs] def write_to_file(self, key, system_name):
"""
Here we are writing to the file which keyword ran and on which system
Argument:
Key: It is basically the keyword name followed by step num in the test case
system_name: It is the system_name on which the step ran
Returns:
True
"""
data_Utils.update_datarepository({"output_file": self.logfile})
with open(self.logfile, "a+") as fo:
fo.write("\n" + "****************************")
fo.write("\n" + key + " ran")
fo.write("\n" + "Ran on " + system_name)
fo.write("\n" + "****************************" + "\n")
return True
[docs] def once_per_tc_with_system_name_given(self, system_name, step_num):
"""
It is used to test the functionality of once per tc with system name given in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "Once per tese case with system name given"
pNote(wdesc)
key = 'once_per_tc_with_system_name_given_' + str(step_num)
self.write_to_file(key, system_name)
return True
[docs] def once_per_tc_with_no_name_given(self, system_name, step_num):
"""
It is used to test the functionality of once per tc with no system name given in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "Once per testcase with system name not given"
pNote(wdesc)
key = 'once_per_tc_with_no_name_given_' + str(step_num)
self.write_to_file(key, system_name)
return True
[docs] def once_per_tc_with_error(self, system_name, step_num):
"""
It is used to test the functionality of once per tc with error in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "Once per testcase with error"
pNote(wdesc)
key = 'once_per_tc_with_error_' + str(step_num)
self.write_to_file(key, system_name)
raise Exception("This is raised in CIregressionActions.once_per_tc_with_error")
return False
[docs] def standard_with_system_name_given(self, system_name, step_num):
"""
It is used to test the functionality of standard with system name given in the test case
Arguments:
system_name: Name of the system on which it needs to be Running
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "Standard with system name given"
pNote(wdesc)
key = 'standard_with_system_name_given_' + str(step_num)
self.write_to_file(key, system_name)
return True
[docs] def standard_with_system_name_not_given(self, system_name, step_num):
"""
It is used to test the functionality of standard with no system name given in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "Standard with system name not given"
pNote(wdesc)
key = 'standard_with_system_name_not_given_' + str(step_num)
self.write_to_file(key, system_name)
return True
[docs] def standard_with_error(self, system_name, step_num):
"""
It is used to test the functionality of standard with error in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "Standard with error"
pNote(wdesc)
key = 'standard_with_error_' + str(step_num)
self.write_to_file(key, system_name)
raise Exception("This is raised in CIregressionActions.standard_with_error")
return False
[docs] def end_of_tc_with_system_name_given(self, system_name, step_num):
"""
It is used to test the functionality of end of tc with system name given in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "End of testcase with system name given"
pNote(wdesc)
key = 'end_of_tc_with_system_name_given_' + str(step_num)
self.write_to_file(key, system_name)
return True
[docs] def end_of_tc_with_system_name_not_given(self, system_name, step_num):
"""
It is used to test the functionality of end of tc with no system name given in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "End of testcase with system name not given"
pNote(wdesc)
key = 'end_of_tc_with_system_name_not_given_' + str(step_num)
self.write_to_file(key, system_name)
return True
[docs] def end_of_tc_with_error(self, system_name, step_num):
"""
It is used to test the functionality of end of tc with error given in the test case
Arguments:
system_name: Name of the system on which it needs to be run
step_num: It is the step_num in the test case file
Returns:
Returns True
"""
wdesc = "End of testcase with error"
pNote(wdesc)
key = 'end_of_tc_with_error_' + str(step_num)
self.write_to_file(key, system_name)
raise Exception("This is raised in CIregressionActions.end_of_tc_with_error")
return False
[docs] def compare_hybrid_tc_result(self, input_file):
"""
It takes the input file path which is the expected result and compares with the log file and
returns True if both matches else False and prints the difference to console.
Arguments:
input_file: It takes expected result file path as input
"""
wdesc = "Compares the test case result file with expected result file"
pNote(wdesc)
output_file = data_Utils.get_object_from_datarepository("output_file")
f = open(output_file)
f1 = open(input_file)
output_content = f.readlines()
input_content = f1.readlines()
if output_content == input_content:
return True
output_set = set([tuple([i]) for i in output_content])
input_set = set([tuple([i]) for i in input_content])
output_set_count = len(output_set)
input_set_count = len(input_set)
if output_set_count > input_set_count:
diff = output_set.difference(input_set)
result_content = output_content
else:
diff = input_set.difference(output_set)
result_content = input_content
pNote("**************The difference between the files is******************")
for j in diff:
s = str(j[0])
index = result_content.index(s)
last_index = result_content.index("****************************\n", index)
start_index = last_index - 2
for i in range(start_index - 1, last_index + 1):
pNote(result_content[i].strip("\n"))
return False
[docs] def increase_value(self, key, status, max_value, max_status):
"""
write to a value in datarepo and return status
if value == max, return max_status instead
"""
value = data_Utils.get_object_from_datarepository(key)
if key is False:
num = 1
else:
if isinstance(value, int):
num = value + 1
else:
num = 1
if num == int(max_value):
status = max_status
if status == "pass":
status = True
elif status == "fail":
status = False
else:
raise Exception("This is raised in ci_regression_actions.increase_value")
output_dict = {key: num}
return status, output_dict
[docs] def local_data_test(self, desired_status):
"""For testing/demo/placeholder
return true/false/exception based on input
:Argument:
desired_status = user desired status
input pass->true, fail->false and everything else ->exception
"""
# print "desired_status: " + desired_status
if desired_status == "pass":
return True
elif desired_status == "fail":
return False
else:
raise Exception("This is raised in ci_regression_actions.local_data_test")
[docs] def create_tmp_dir(self):
"""
Create a temp directory for parallel execution test
"""
path = file_Utils.createDir(file_Utils.getDirName(self.logsdir), "tmp")
return True, {"parallel_exec_tmp_dir": os.path.join(file_Utils.getDirName(self.logsdir),
"tmp")} if path else False
[docs] def create_sub_tmp_file(self, system_name="", filename="", delete="yes"):
"""
Create temp file for parallel execution test
"""
path = data_Utils.get_object_from_datarepository("parallel_exec_tmp_dir")
if system_name != "" and filename == "":
filename = data_Utils.getSystemData(self.datafile, system_name, "filename")
elif system_name == "" and filename == "":
pNote("No system or filename found, needs to provide at least one", "error")
f = open(os.path.join(path, filename), "w")
f.write("This is a test string")
f.close()
time.sleep(10)
status = False
if delete == "yes":
try:
file_Utils.delFile(os.path.join(path, filename))
status = True
except OSError:
pNote("Cannot remove tmp file, no write access to {}".format(path), "error")
else:
status = True
return status
[docs] def tmp_file_count(self, int_count):
""" count how many files are under the temp dir """
time.sleep(5)
path = data_Utils.get_object_from_datarepository("parallel_exec_tmp_dir")
content = os.listdir(path)
pNote(content)
pNote(str(len(content)) + str(int_count))
return len(content) == int_count
[docs] def check_tmp_file_exists(self, system_name="", filename=""):
""" check if temp folder exist in the parallel execution result tmp dir """
if system_name != "" and filename == "":
filename = data_Utils.getSystemData(self.datafile, system_name, "filename")
elif system_name == "" and filename == "":
pNote("No system or filename found, needs to provide at least one", "error")
path = data_Utils.get_object_from_datarepository("parallel_exec_tmp_dir")
path = os.path.join(path, filename)
return file_Utils.fileExists(path)
[docs] def delete_tmp_dir(self):
"""
Delete temp directory for parallel execution test
"""
path = data_Utils.get_object_from_datarepository("parallel_exec_tmp_dir")
return file_Utils.delFolder(path)
[docs] def check_kw_arg_type_prefix(self, str_value, int_value, float_value, bool_value,
list_value, tuple_value, dict_value, file_value):
"""This keyword is intended to test the type prefix for keyword arguments
when an argument name has a type_ prefix, the variable type will become
the type specified in the type_ prefix
:Argument:
1. str_value - expected to be string
2. int_value - expected to be int
3. float_value - expected to be float
4. bool_value - expected to be bool
5. list_value - expected to be list
6. tuple_value - expected to be tuple
7. dict_value - expected to be dict
8. file_value - expected to be file
"""
file_contents = "Checking file datatype in wtags"
status = True
err_msg = "{} is not an {} value but of type {}"
if type(str_value) is not str:
# this block checks if str_value is string type
pNote(err_msg.format(str_value, "str", type(str_value)), "error")
status = False
if type(int_value) is not int:
# this block checks if int_value is int type
pNote(err_msg.format(int_value, "int", type(int_value)), "error")
status = False
if type(float_value) is not float:
# this block checks if float_value is float type
pNote(err_msg.format(float_value, "float", type(float_value)), "error")
status = False
if type(bool_value) is not bool:
# this block checks if bool_value is bool type
pNote(err_msg.format(bool_value, "bool", type(bool_value)), "error")
status = False
if type(list_value) is not list:
# this block checks if list_value is list type
pNote(err_msg.format(list_value, "list", type(list_value)), "error")
status = False
if type(tuple_value) is not tuple:
# this block checks if tuple_value is tuple type
pNote(err_msg.format(tuple_value, "tuple", type(tuple_value)), "error")
status = False
if type(dict_value) is not dict:
# this block checks if dict_value is dict type
pNote(err_msg.format(dict_value, "dict", type(dict_value)), "error")
status = False
if type(file_value) is not file:
# this block checks if file_value is file type
pNote(err_msg.format(file_value, "file", type(file_value)), "error")
status = False
else:
actual_contents = file_value.read().strip()
if actual_contents != file_contents:
# this block checks if the contents of file type variable is expected
pNote("contents of the file {} is <<{}>> which does not match expected"
" <<{}>>".format(file_value, actual_contents, file_contents), "error")
status = False
return status
[docs] def check_values_from_datafile(self, system_name, strvar, langs, states,
currencys, ramspace, configfile, intvar,
anotherfile):
"""Verify the datatype of the value read from the datafile using either
the tag or wtag feature
:Argument:
1. system_name = system name in the datafile
2. strvar = string variable
3. langs = list variable (should get from data file using wtag)
4. states = tuple variable
5. currencys = dict variable
6. ramspace = boolean variable
7. configfile = file variable
8. intvar = int variable
9. anotherfile = file variable
"""
def check_type(var, varname, datatype):
"""check that vars are of correct datatype
"""
vartype = type(var)
status = True
if vartype is not datatype:
pNote('{} is expected to be {} type, but found to be of '
'{} type'.format(varname, datatype, vartype), "error")
status = False
return status
status = True
datafile = Utils.config_Utils.datafile
tc_filepath = os.path.dirname(data_Utils.get_object_from_datarepository(
'wt_testcase_filepath'))
# this block checks if strvar is string type
status = check_type(strvar, "strvar", str) and status
# this block checks if langs is list type
status = check_type(langs, "langs", list) and status
# this block checks if states is tuple type
status = check_type(states, "states", tuple) and status
# this block checks if currencys is dict type
status = check_type(currencys, "currencys", dict) and status
# this block checks if ramspace is bool type
status = check_type(ramspace, "ramspace", bool) and status
file_err = '{} is not a file, please check'
try:
# check if tag is present and its functionality is not broken
if anotherfile.startswith('tag'):
anotherfile = data_Utils.resolve_argument_value_to_get_tag_value(datafile,
system_name,
anotherfile)
# this checks if configfile and anotherfile are valid files
# by getting the absolute path of the file
if not os.path.isabs(configfile):
configfile = file_Utils.getAbsPath(configfile, tc_filepath)
if not os.path.isabs(anotherfile):
anotherfile = file_Utils.getAbsPath(anotherfile, tc_filepath)
if not os.path.isfile(configfile):
pNote(file_err.format(configfile), "error")
if not os.path.isfile(anotherfile):
pNote(file_err.format(anotherfile), "error")
except AttributeError:
pNote('configfile and anotherfile are expected to be files', "error")
pNote('type of configfile is {}'.format(type(configfile)), "error")
pNote('type of anotherfile is {}'.format(type(anotherfile)), "error")
status = False
if type(intvar) is str and intvar.startswith('tag'):
intvar = data_Utils.resolve_argument_value_to_get_tag_value(
datafile, system_name, intvar)
else:
status = check_type(intvar, "intvar", int) and status
return status
[docs] def check_opt_values_from_datafile(self, langs=['Sanskrit', 'Tamil'],
strvar="I am a default variable",
states="wtag=states",
system_name="sys_wtag",
currencys={'USA': 'USD'},
ramspace=False,
configfile="../../config_files/check_file_type",
intvar=496):
"""Verify the datatype of the value read from the datafile using either
the tag or wtag feature
:Argument:
1. system_name = system name in the datafile
2. strvar = string variable
3. langs = list variable (should get from data file using wtag)
4. states = tuple variable
5. currencys = dict variable
6. ramspace = boolean variable
7. configfile = file variable
8. intvar = int variable
"""
def check_type(var, varname, datatype):
"""check that vars are of correct datatype
"""
vartype = type(var)
status = True
if vartype is not datatype:
pNote('{} is expected to be {} type, but found to be of '
'{} type'.format(varname, datatype, vartype), "error")
status = False
return status
status = True
datafile = Utils.config_Utils.datafile
tc_filepath = os.path.dirname(data_Utils.get_object_from_datarepository(
'wt_testcase_filepath'))
# this block checks if strvar is string type
status = check_type(strvar, "strvar", str) and status
# this block checks if langs is list type
status = check_type(langs, "langs", list) and status
# this block checks if states is tuple type
status = check_type(states, "states", tuple) and status
# this block checks if currencys is dict type
status = check_type(currencys, "currencys", dict) and status
# this block checks if ramspace is bool type
status = check_type(ramspace, "ramspace", bool) and status
file_err = '{} is not a file, please check'
try:
# this checks if configfile and anotherfile are valid files
# by getting the absolute path of the file
if not os.path.isabs(configfile):
configfile = file_Utils.getAbsPath(configfile, tc_filepath)
if not os.path.isfile(configfile):
pNote(file_err.format(configfile), "error")
except AttributeError:
pNote('configfile and anotherfile are expected to be files', "error")
pNote('type of configfile is {}'.format(type(configfile)), "error")
status = False
if type(intvar) is str and intvar.startswith('tag'):
intvar = data_Utils.resolve_argument_value_to_get_tag_value(datafile, system_name,
intvar)
else:
status = check_type(intvar, "intvar", int) and status
return status
[docs] def generate_timestamp_delta(self, stored_delta_key, timestamp_key, desired_status):
"""
test keyword created for runmode_timer
Generate a delta from comparing current time with store timestamp
save the delta and current timestamp in repo for keyword verify_delta
:Argument:
stored_delta_key = key name to store the list of delta
timestamp_key = key name to store the timestamp
desired_status = user desired status
input pass->true, fail->false and everything else ->exception
"""
cur_ts = datetime_utils.get_current_timestamp()
result_dict = {timestamp_key: cur_ts}
status = self.local_data_test(desired_status)
previous_time = data_Utils.get_object_from_datarepository(timestamp_key)
stored_delta = data_Utils.get_object_from_datarepository(stored_delta_key)
if previous_time:
delta = datetime_utils.get_time_delta(previous_time, cur_ts)
if stored_delta:
stored_delta.append(delta)
result_dict.update({stored_delta_key: stored_delta})
else:
result_dict.update({stored_delta_key: [delta]})
return status, result_dict
[docs] def verify_delta(self, delta_key, int_num, float_min_val):
"""
test keyword created for runmode_timer
Compare a list of delta to a minimum value
This is used to ensure runmode is correctly waiting
for a minimum amount of time (float_min_val)
:Argument:
delta_key = key name for the list of delta
int_num = number of delta required in list of delta
float_min_val = minimum value of each delta
"""
status = False
stored_delta = data_Utils.get_object_from_datarepository(delta_key)
if stored_delta:
if len(stored_delta) != int_num:
pNote("not enough delta value stored in list", "Error")
else:
status = all([x >= float_min_val for x in stored_delta])
if not status:
pNote("Delta: {} not meet minimum value {}". \
format(str(stored_delta), float_min_val))
return status
[docs] def instantiate_list_key_in_data_repository(self, key):
"""
This will create a key in the data_repository
:param key: name of the key that should be created in the data_repository.
The data type of it's value will be list.
:return: status (bool), output_dict (dict)
"""
wdesc = "This keyword will create a key in the data repository"
pNote(wdesc)
status = True
output_dict = {key: []}
pNote("Updating Data Repository with key: {0}".format(key))
return status, output_dict
[docs] def update_list_key_in_data_repository(self, key, value, status="True"):
"""
This keyword will update an existing key in the data repository
:param key: key name
:param value: value to be updated
:param: status: kw will pass/fail accordingly
:return: status (bool), updated_dict (dict)
"""
wdesc = "This keyword will update an existing key in the data repository"
pNote(wdesc)
status = status.lower() != "false"
data = data_Utils.get_object_from_datarepository(key)
data.append(value)
updated_dict = {key: data}
pNote("Updating {0} value wih {1}".format(key, format(value)))
return status, updated_dict
[docs] def verify_list_key_value_in_data_repo(self, key, expected_value):
"""
This keyword will update an existing key in the data repository
:param key: key name
:param value: value to be updated
:return: status (bool), updated_dict (dict)
"""
wdesc = "This keyword will verify an existing key's value"
pNote(wdesc)
status = False
data = data_Utils.get_object_from_datarepository(key)
pNote("{1} Value (as stored in Data Repository): {0}".format(data, key))
compare_value = [x.strip() for x in expected_value.split(",")]
pNote("Expected Value: {0}".format(compare_value))
if len(data) == len(compare_value):
for sub_data, sub_compare in zip(data, compare_value):
if sub_data != sub_compare:
break
else:
status = True
if not status:
pNote("Expected Value and Existing Value do not match", "error")
return status