Source code for Framework.Utils.file_Utils

'''
Copyright 2017, Fujitsu Network Communications, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

# import standard python libraries
import datetime
import time
import os
import sys
import string
import shutil
import zipfile
import string_Utils
from print_Utils import print_info, print_error, print_warning, print_exception

try:
    if 'linux' in sys.platform:
        mod = 'pexpect'
        import pexpect
except Exception:
    print_info("{0}: {1} module is not installed".format(os.path.abspath(__file__), mod))

[docs]def findLastString(filename, searchterm): """ Return the last line of a file""" fd = open (filename, "r") linenum = -1 for i, line in enumerate (fd, 1): if searchterm in line: linenum = i return linenum
[docs]def searchFile(filename, searchterm): """ search file for text, return True or False""" fd = open (filename, 'r') data = fd.read() test = data.find(searchterm) if test < 0 : return False return True
[docs]def searchaftertext(filename, startterm, searchterm): """Start search after a certain text in a file""" #print startterm #print searchterm startline = findLastString (filename, startterm) searchtermfound = findLastString (filename, searchterm) if searchtermfound > startline: return True return False
[docs]def fileExists(fname): """ check if file exists""" filestatus = os.path.isfile(fname) return filestatus
[docs]def dirExists(path): """ check if directory exists""" dirstatus = os.path.isdir(path) return dirstatus
[docs]def pathExists(path): """ check if path exists , does not care if it is file or directory""" pathstatus = os.path.exists(path) return pathstatus
[docs]def delFile(fname): """ check if file exists and delete it""" if fileExists(fname): filestatus = os.remove(fname) return filestatus
[docs]def delFolder(path): """ check if folder exists and delete it with its content""" status = False if dirExists(path): try: shutil.rmtree(path) status = True except OSError: print_error("Cannot remove folder {}".format(path)) return status
# Return time and date
[docs]def getDateTime(time_format=None): """Returns the current year-month-date_hour-minute-second """ if time_format is None: timestamp = datetime.datetime.now().strftime("%y-%m-%d_%H-%M-%S-%f") else: timestamp = datetime.datetime.now().strftime(time_format) return timestamp
[docs]def getCurrentFilePath(): """Will return the file path of file_Utils""" return sys.path[0]
[docs]def getCurrentFileName(): """ Will return the file name of file_Utils. Copy into file needed.""" return os.path.basename (__file__)
[docs]def getDirName(filepath): """ returns the directory component for the path provided""" return os.path.dirname(filepath)
[docs]def getXMLDataFile(filename, path): """Get the xml Datafile for '.py' testcases """ path = path.replace ('Testcases','Data') filename = filename.replace(".py", "") print_info(path + os.sep + filename + os.sep + filename + '.xml') return path + os.sep + filename + os.sep + filename + '.xml'
[docs]def createDirForPyTc(filename, path, dirname): """ method for old directory structure, delete later if not required""" path = path.replace ('Testcases',dirname) filename = filename.replace(".py", "") dirpath = path + os.sep + filename if dirExists(dirpath): #print_info("Directory already exists for this TC: '%s'" % dirpath) return dirpath os.makedirs(dirpath) #print_info("A new directory created for this TC: '%s'" % dirpath) return dirpath
[docs]def createDir(path, dirname): """Creates a new directory with name = dirname under the provided path, ignores if directory with same name already exists Arguments: 1. path = (string) full path of an existing directory where a new directory need to be created 2. dirname = (string) name of the new directory to be created """ if dirExists(path): dirpath = path + os.sep + dirname if dirExists(dirpath): return dirpath try: os.makedirs(dirpath) except Exception,e: print_error(str(e)) #print_info("A new '%s' directory created : '%s'" % (dirname, dirpath)) return dirpath else: print_warning("Directory does not exist in provided path: {0}".format(path)) return False
[docs]def check_and_create_dir(dirpath): """Checks if dir exists in provided path. If dir exists returns True Elseif dir does not exists, tries to create a directory - If dir created successfully, returns True. - If dir creation failed returns false """ status = False if pathExists(dirpath): print_info("Directory exists in provided path '{0}': ".format(dirpath)) status = True elif not pathExists(dirpath): try: os.makedirs(dirpath) except Exception as exception: print_warning("Creating directory at '{0}' failed.".format(dirpath)) print_exception(exception) status = False else: status = True return status
[docs]def createDir_addtimestamp(path, dirname): """Creates a new directory with name = dirname under the provided path, if directory with same name already exists adds date+timestamp to the name and creates a new directory with name = dirname+date&timestamp Arguments: 1. path = (string) full path of an existing directory where a new directory need to be created 2. dirname = (string) name of the new directory to be created """ if dirExists(path): dirpath = os.path.abspath(path) + os.sep + dirname if dirExists(dirpath): dirpath = addTimeDate(dirpath) try: os.makedirs(dirpath) except Exception as exception: # print_exception(exception) print_info("Add date time and try to create directory one more time") dirpath = addTimeDate(dirpath) os.makedirs(dirpath) #print_info("A new '%s' directory created : '%s'" % (dirname, dirpath)) return dirpath else: print_warning("Directory does not exist in provided path: {0}".format(path)) return False
# to be deleted #=============================================================================== # Testcases, Logs, Results directory are at the same level # Creates a new sub-directory under the Results/Logs directory with the dirname provided #=============================================================================== # def createDirForXmlTc(filename, path, dirname): # path = path.replace ('Testcases',dirname) # filename = filename.replace(".xml", "") # dirpath = path + os.sep + filename # if dirExists(dirpath): # #print_info("Directory already exists for this TC: '%s'" % dirpath) # return dirpath # # os.makedirs(dirpath) # #print_info("A new directory created for this TC: '%s'" % dirpath) # return dirpath
[docs]def getFileName(path): """get the file name part from a absolute path""" fname = os.path.split(path)[1] return fname
# to be deleted # def getFile(filename, path, dirname): # dirpath = createDir (filename, path, dirname) # path_split = dirpath.split('/') # fname = path_split[-1] # extension = getExtension (dirname) # print 'dirpath', dirpath # fullpath = dirpath + os.sep + fname + '.' + extension # if fileExists(fullpath): # fullpath = addTimeDate(fullpath) # return fullpath # to be deleted #=============================================================================== # Create a results/logs file for an xml based test case # Assumption: directory structure = Result, Logs, Testcases at same level # #=============================================================================== # def getFileForXmlTc(filename, path, dirname): # dirpath = createDirForXmlTc (path, dirname) # path_split = dirpath.split('/') # fname = path_split[-1] # extension = getExtension (dirname) # fullpath = dirpath + os.sep + fname + '.' + extension # if fileExists(fullpath): # fullpath = addTimeDate(fullpath) # return fullpath
[docs]def get_extension_from_path(path): """Takes a filepath as input and returns the extension (extension is everything from the last dot to the end) If the input is a directory, the extension returned will be empty Arguments: 1. path = path of the file or directory """ if path: extn = os.path.splitext(path)[1] else: extn = "" return extn
[docs]def getExtension(dirname): """get extension of a path""" extension = dirname[:3] return extension
[docs]def getNameOnly(filename): """get file name without extension from a path""" nameonly = filename.split('.')[0] return nameonly
[docs]def get_file_from_remote_server(remote_ip, remote_uname, remote_passwd, src, dest, logfile=None): """ use scp to get file from remote server """ child = pexpect.spawn('scp -r %s@%s:%s %s' % (remote_uname, remote_ip, src, dest )) try: child.logfile = open(logfile, "a") except Exception,e: child.logfile = None child.expect('assword:*') child.sendline(remote_passwd) try: child.expect(pexpect.EOF) except Exception,e: print_error("Import error, error : '%s'" % str(e)) return False else: return True
[docs]def put_file_to_remote_server(remote_ip, remote_uname, remote_passwd, src, dest, logfile=None): """ use scp to put file from remote server """ cmd = 'scp %s %s@%s:%s' % (dest, remote_uname, remote_ip, src) print_info("Running cmd: %s" % cmd) child = pexpect.spawn(cmd) try: child.logfile = open(logfile, "a") except Exception, e: child.logfile = None child.expect('assword:*') child.sendline(remote_passwd) try: child.expect(pexpect.EOF) out = child.before print_info(out) if '100%' in out: return True else: return False except Exception, e: print_error("Import error, error : '%s'" % str(e)) return False
[docs]def incrementFilename(filename, increment): """ add count to filename """ fname = filename.split('.') finalname= fname[0]+'_'+increment+'.'+fname[1] return finalname
# to be deleted # def getDataFile(filename, path):# to be deleted once get_testcase_datafile is implemented # nameonly = getNameOnly(filename) # fullpath = path + os.sep + 'Data' + os.sep + nameonly + '_MainData' + '.xml' # # if not fileExists(fullpath): print_warning('Default Datafile (%s_MainData.xml) not found in default location %s ' % (nameonly, fullpath)) # return fullpath
[docs]def getCustomLogFile(filename, path, custom, ext='.log'): """ append filename of log file with custom string """ nameonly = getNameOnly(filename) fullpath = path + os.sep + nameonly + "_" + custom+ ext if fileExists(fullpath): fullpath = addTimeDate (fullpath) return fullpath
[docs]def getNewExtension(filename, extension): """ Replace the extention of the file""" nameonly = os.path.splitext(filename) fullpath = nameonly[0] + '.' + extension if fileExists(fullpath): fullpath = addTimeDate (fullpath) return fullpath
[docs]def addTimeDate(path): """ add time and date to a path (file/dir)""" if fileExists(path) or dirExists(path) : time.sleep(2) ftime = getDateTime() path = os.path.splitext(path)[0] + "_"+ftime + os.path.splitext(path)[1] return path
[docs]def deleteLastLine(filename): """ delete the last line of a file""" lines = open(filename, 'r').readlines() del lines[-1] open(filename, 'w').writelines(lines)
[docs]def deleteFirstLine(filename): """ Takes a file name as input and deletes the first line""" lines = open(filename, 'r').readlines() del lines[0] open(filename, 'w').writelines(lines)
[docs]def deleteLinesFromFirst(filename): """ Takes a file name as input and deletes the first line""" deleteFirstLine(filename)
[docs]def deletLinesFromLast(filename): """ delete the last line of a file""" deleteLastLine(filename)
[docs]def deleteMatchingFileLines(origfile, newfile, arrValues): """ Searches existing file for text and creates new file without the lines containing the text.""" fin = open(origfile, 'r') fout = open(newfile, 'w') for line in fin: if any(v in line for v in arrValues): print_info("The following was deleted: '%s'" % line) else: fout.write(line) fout.close()
[docs]def deleteLinesUntilMatch(filename, match, startfrom='first'): """ Opens a file and deletes all the lines until a matching line is encoutered # filename = location of the file from which lines are to be deleted # match = the line to be matched with, this is the entire line given in string format # startfrom = 'first' (default), starts the search begining form the first line of the file # and deletes all the lines until matching line is reached # = 'last', starts the search begining form the last line of the file moving upwards # and deletes all the lines until matching line is reached""" lines= open(filename, 'r').readlines() linesReverse = list(reversed(lines)) if startfrom == 'first': for i in lines[:]: if i != match: lines.remove(i) else: break resultantList = list(lines) if startfrom == 'last': for i in linesReverse[:]: if i != match: linesReverse.remove(i) else: break resultantList = list(reversed(linesReverse)) open(filename, 'w+').writelines(resultantList)
[docs]def copyFileContents(srcfile, dstfile): """ copy file from src to dst using append method""" lines=open(srcfile,'r').readlines() open(dstfile, 'a').writelines(lines)
[docs]def getLinesBetweenMatchingLines(srcfile, dstfile, start, end, no_of_search=1): """ This function opens a file, searches for the start and end strings and writes the lines in between start and end string into a new file (start, end inluded) # srcfile = source file which has the data to be parsed # dstfile = destination file to which the captured data is to be written # start = starting string to be serached for # end = ending string to be searched for # no_of_searches = denotes the number of times the file has to be searched for the start and end strings # supported values = any interger greater than 0 # = EOF | eof searches the entire file""" lines = open(srcfile,'r').readlines() #print_info ("lines:%s"% lines) i=0 iteration=0 resultantList =[] if no_of_search == 0: print_error ("no of searches should be a non-zero number") return False while(i<len(lines)): #print lines[i] printable_only = filter(lambda x:x in string.printable, lines[i]) if printable_only == start: resultantList.append(lines[i]) i+=1 while (i<len(lines)): printable_only = filter(lambda x:x in string.printable, lines[i]) if printable_only == end: resultantList.append(lines[i]) iteration+=1 break else: resultantList.append(lines[i]) i+=1 if no_of_search == 'EOF' or no_of_search == 'eof' or iteration < no_of_search: i+=1 elif no_of_search==iteration: break if len(resultantList)==0 : print_error("no match found in the file") open(dstfile,'w+').writelines(resultantList)
[docs]def getSubDirFile(subdir, existing_dir, ext): """ Searches for a sub-directory with provided name under a directory. If it does # not exist creates a new sub-directory # Now under this newly created directory creates file with sub-directory name and # provided extension.If a file with the same name already exists # adds data and # time-stamp to the filename . # Returns the filename thus created to the calling function # # Arguments: # subdir = name of the directory and the file to be created under an existing directory # dir = an existing directory # ext = extension for the newly created filename # # Note: This function does not open the result file, it only creates a name for the file. # The file with the created name has to be opened seperately """ path = createDir_addtimestamp(existing_dir, subdir ) fullpath = path + os.sep + subdir + '.' + ext if fileExists(fullpath): fullpath = addTimeDate(fullpath) return fullpath
[docs]def create_execution_directory(filepath): """Gets a Testcase/Testsuite/Project.xml filepath as input, Checks if a directory called Execution exits at the same level of Testcase/Testsuite/Project directory If it does not exist creates a directory called Execution. """ dirname = getDirName(filepath) execution_folder = os.sep.join(dirname.split(os.sep)[:-1]) + os.sep + 'Execution' if not pathExists(execution_folder): os.makedirs(execution_folder) dirname = getDateTime() execution_dir = createDir_addtimestamp(execution_folder, dirname) return execution_dir
[docs]def create_zipdir(zipname, path, extn='zip'): """zip the path and output to same level""" output_filepath = path + os.sep + zipname zip_file_path = shutil.make_archive(output_filepath, extn, path) return zip_file_path
[docs]def getAbsPath(relative_path, start_directory="."): """This function is used to manipulate the path according to the relative path that is specified :Arguments: 1. @param relative_path: The relative path of directory or file 2. @param start_directory: The actual dir path where the file must be :Returns: 1. @return: Returns the abspath with the relative path specified """ value = False if relative_path and start_directory: relative_path = relative_path.strip() try: #print relative_path os.chdir(start_directory) path = os.path.abspath(relative_path) value = path except Exception, err: print_error("{0} file does not exist in provided path".format(relative_path)) print_error(err) return value
[docs]def get_parent_dir(path, child): """ This function gets the parent directory of any a specified child folder from the given path """ if path.rsplit(os.sep)[-1] == child: path = os.path.dirname(path) else: path = os.path.dirname(path) path = get_parent_dir(path, child) return path
[docs]def check_extension_get_absolute_path(relative_path, start_directory, list_extn=[".json", ".xml", ".txt"]): """ This is wrapper function that gets and verifies extention of a file path and then returns the absolute path. start_directory must be an absolute path """ extension = get_extension_from_path(relative_path) if extension in list_extn: value = getAbsPath(relative_path, start_directory) else: value = relative_path return value
[docs]def get_absolute_path_from_start_directory(relative_path, start_directory, extension=".json"): """ DEPRECATED IN 2.9 Keyword developer must be aware that relative path can be relative to 1. testcase file 2. data file (if path is given in tag= value) start_directory must be an absolute path """ print_error("This function is deprecated in 2.9, use check_extension_get_absolute_path or getAbspath instead") return check_extension_get_absolute_path(relative_path, start_directory, extension)
[docs]def get_absolute_path_of_directory(relative_path_of_dir, start_directory): """ DEPRECATED IN 2.9 When provided with a start directory and a relative path of a directory, this function returns the absolute path. Else returns the relative path Keyword developer must be aware that relative path can be relative to 1. testcase file 2. data file (if path is given in tag= value) start_directory must be an absolute path """ print_error("This function is deprecated in 2.9, use check_extension_get_absolute_path or getAbspath instead") return getAbsPath(relative_path_of_dir, start_directory)
# ============================================================================== # File Operations # ==============================================================================
[docs]def log_result(oper, result): """the methods in file_actions class use this to log the result of its operation """ resmsg = "completed successfully" if result else "failed" msg = "file {} operation {}".format(oper, resmsg) if result: print_info(msg) else: print_error(msg)
[docs]def open_file(newfile, mode): """ Opens the newfile in the mode specified and returns the filedescriptor which would be used by all the other file util operations. :Arguments: newfile - name of the file to be opened mode - mode determines the mode in which the file has to be opened, i.e., read, write, append, etc. r - Opens a file for reading only. The file pointer is placed at the beginning of the file. This is the default mode. rb - Opens a file for reading only in binary format. The file pointer is placed at the beginning of the file. This is the default mode. r+ - Opens a file for both reading and writing. The file pointer placed at the beginning of the file. rb+ - Opens a file for both reading and writing in binary format. The file pointer placed at the beginning of the file. w - Opens a file for writing only. Overwrites the file if the file exists. If the file does not exist, creates a new file for writing. wb - Opens a file for writing only in binary format. Overwrites the file if the file exists. If the file does not exist, creates a new file for writing. w+ - Opens a file for both writing and reading. Overwrites the existing file if the file exists. If the file does not exist, creates a new file for reading and writing. wb+ - Opens a file for both writing and reading in binary format. Overwrites the existing file if the file exists. If the file does not exist, creates a new file for reading and writing. a - Opens a file for appending. The file pointer is at the end of the file if the file exists. That is, the file is in the append mode. If the file does not exist, it creates a new file for writing. ab - Opens a file for appending in binary format. The file pointer is at the end of the file if the file exists. That is, the file is in the append mode. If the file does not exist, it creates a new file for writing. a+ - Opens a file for both appending and reading. The file pointer is at the end of the file if the file exists. The file opens in the append mode. If the file does not exist, it creates a new file for reading and writing. ab+ - Opens a file for both appending and reading in binary format. The file pointer is at the end of the file if the file exists. The file opens in the append mode. If the file does not exist, it creates a new file for reading and writing. :Return: fd - file descriptor of the new file opened """ try: fd = open(newfile, mode) print_info("file {} opened successfully with mode {}".format(newfile, mode)) except IOError as e: print_error("found io exception {} while opening file {} in mode {}". format(str(e), newfile, mode)) return None except Exception as e: print_error("found exception {} while opening file {} in mode {}". format(str(e), newfile, mode)) return None return fd
[docs]def close(fd): """ Close the file. A closed file cannot be read or written any more. :Arguments: fd - file descriptor got from open_file :Return: True/False - based on the success/failure of the operation """ try: name = fd.name status = fd.close() print_info("file {} closed successfully".format(name)) except ValueError: print_warning("file is already closed...") status = True except Exception as e: print_error("found exception {} while closing {}".format(str(e), fd)) status = False return status
[docs]def read(fd, **kwargs): """ Reads at most size bytes from the file (less if the read hits EOF before obtaining size bytes). :Arguments: fd - file descriptor got from open_file :Optional: size - number of bytes to be read :Return: the string read from the file, None if not able to read """ try: readsize = fd.read(**kwargs) print_info("read {} bytes from file {}".format(readsize, fd.name)) except ValueError: print_error("file is already closed...") readsize = 0 except Exception as e: print_error("found exception {} while reading {}".format(str(e), fd)) readsize = 0 return readsize
[docs]def readline(fd, **kwargs): """ Reads one entire line from the file. A trailing newline character is kept in the string. :Arguments: fd - file descriptor got from open_file :Return: the line read """ try: line = fd.readline(**kwargs) print_info("read a line from file "+fd.name) except ValueError: print_error("file is already closed...") line = False except Exception as e: print_error("found exception {} while reading line in {}". format(str(e), fd)) line = False return line
[docs]def readlines(fd, **kwargs): """ Reads until EOF using readline() and return a list containing the lines. If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read. :Arguments: fd - file descriptor got from open_file :Return: list of lines from the file """ try: lines = fd.readlines(**kwargs) print_info("read all lines from file "+fd.name) except ValueError: print_error("file is already closed...") lines = False except Exception as e: print_error("found exception {} while reading lines in {}". format(str(e), fd)) lines = False return lines
[docs]def truncate(fd, **kwargs): """ Truncates the file's size. If the optional size argument is present, the file is truncated to (at most) that size. :Arguments: fd - file descriptor got from open_file :Optional: size - size up to which to truncate :Return: True/False - based on the success/failure of the operation """ status = False try: fd.truncate(**kwargs) print_info("truncated the file "+fd.name) status = True except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while truncating {}". format(str(e), fd)) return status
[docs]def write(fd, string): """ Writes a string to the file. :Arguments: fd - file descriptor got from open_file str - string to write :Return: True/False - based on the success/failure of the operation """ status = False try: fd.write(string) print_info("written to file "+fd.name) status = True except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while writing {}".format(str(e), fd)) return status
[docs]def writelines(fd, seq): """ Writes a sequence of strings to the file. The sequence can be any iterable object producing strings, typically a list of strings. :Arguments: fd - file descriptor got from open_file sequence - sequence of lines to be written :Return: True/False - based on the success/failure of the operation """ status = False try: fd.writelines(seq) print_info("written seq to the file "+fd.name) status = True except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while writing {}".format(str(e), fd)) return status
[docs]def get_current_position(fd): """ Returns the file's current position :Arguments: fd - file descriptor got from open_file :Return: current position of the file, -1 if error occurred """ curpos = -1 try: curpos = fd.tell() except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while getting current position on {}". format(str(e), fd)) return curpos
[docs]def move_to_position(fd, offset, **kwargs): """ Sets the file's current position :Arguments: fd - file descriptor got from open_file offset - number of bytes to be moved :Optional: whence - 0 - move offset positions from beginning of file 1 - move offset positions from current position of file 2 - move offset positions from end of file :Return: current position after movement """ curpos = -1 try: if "whence" in kwargs: curpos = fd.seek(offset, kwargs["whence"]) else: curpos = fd.seek(offset) except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while moving to position on {}". format(str(e), fd)) return curpos
[docs]def move_to_text(fd, pattern, n=1): """ seek to a text in the file :Arguments: pattern - the regular expression pattern to search for in the file n - seek to the nth occurrence from beginning, default first occurrence use negative indices for from the end :Return: True/False - based success or failure of seeking """ pos = -1 fd.seek(0, 0) data = fd.read() try: data_pos = string_Utils.seek_next(pattern, data)[n] pos = fd.seek(data_pos) print_info("moving to {}th pattern {} in file {} successful". format(n, pattern, fd.name)) except IndexError: print_error("pattern {} not found in {}".format(pattern, fd)) except ValueError: print_error("file is already closed...") except Exception as e: print_error("exception {} occurred while seeking pattern {} in {}". format(str(e), pattern, fd)) return pos
[docs]def get_lines_between(fd, startidx, endidx): """ Get lines between args[0] to args[1] :Arguments: startidx - line index from which to send endidx - line index till to be send :Return: lines between startidx and endidx """ lines = [] try: fd.seek(0, 0) all_lines = fd.readlines() lines = all_lines[startidx:endidx] except IndexError: print_error("file has only {} lines, but expecting {} to {} lines". format(len(all_lines), startidx, endidx)) except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while moving to position on {}". format(str(e), fd)) return lines
[docs]def flush(fd): """ Flush the internal buffer, like stdio's fflush. This may be a no-op on some file-like objects. :Arguments: fd - file descriptor got from open_file :Return: True/False - based on the success/failure of the operation """ status = False try: status = fd.flush() except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while flushing {}".format(str(e), fd)) return status
[docs]def fileno(fd): """ Returns the integer file descriptor that is used by the underlying implementation to request I/O operations from the operating system. :Arguments: fd - file descriptor got from open_file :Return: integer file descriptor """ ifd = -1 try: ifd = fd.fileno() except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while getting integer file descriptor " "of {}".format(str(e), fd)) return ifd
[docs]def isatty(fd): """ Returns True if the file is connected to a tty(-like) device, else False. :Arguments: fd - file descriptor got from open_file :Return: True/False - description above """ status = False try: status = fd.isatty() except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while checking atty of {}". format(str(e), fd)) return status
[docs]def get_next_line(fd): """ Returns the next line from the file each time it is being called. :Arguments: fd - file descriptor got from open_file :Return: next line """ try: line = fd.next() except ValueError: print_error("file is already closed...") line = False except Exception as e: print_error("found exception {} while getting line of {}". format(str(e), fd)) line = False return line
[docs]def get_file_name(fd): """ Returns name of the file. :Arguments: fd - file descriptor got from open_file :Return: name of the file, empty string if there was an exception """ fname = "" try: fname = fd.name except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while getting file name of {}". format(str(e), fd)) return fname
[docs]def get_file_mode(fd): """ Returns access mode with which file was opened. :Arguments: fd - file descriptor got from open_file :Return: mode """ mode = "" try: mode = fd.mode except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while getting file mode of {}". format(str(e), fd)) return mode
[docs]def is_softspace_required(fd): """ Returns false if space explicitly required with print, true otherwise. :Arguments: fd - file descriptor got from open_file :Return: True/False - description above """ status = False try: status = fd.softspace except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while getting file mode of {}". format(str(e), fd)) return status
[docs]def is_file_closed(fd): """ Returns true if file is closed, false otherwise. :Arguments: fd - file descriptor got from open_file :Return: True/False - description above """ status = False try: status = fd.closed except ValueError: print_error("file is already closed...") except Exception as e: print_error("found exception {} while getting file mode of {}". format(str(e), fd)) return status
# ============================================================================== # file meta operations like renaming, deleting etc., # ==============================================================================
[docs]def copyfileobj(fsrc, fdst): """ Copy the contents of the file-like object fsrc to the file-like object fdst. :Arguments: fsrc - file descriptor of the file to be copied fdst - file descriptor of the file on which to be copied :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.copyfileobj(fsrc, fdst) status = True except Exception as e: print_error("copying file {} to file {} raised exception {}". format(fsrc, fdst, str(e))) return status
[docs]def copyfile(src, dst): """ Copy the contents (no metadata) of the file named src to a file named dst. dst must be the complete target file name. :Arguments: src - file to be copied dst - file on which to be copied :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.copyfile(src, dst) print_info("src {} copied to dst {} successfully".format(src, dst)) status = True except Exception as e: print_error("copying file {} to file {} raised exception {}". format(src, dst, str(e))) return status
[docs]def copymode(src, dst): """ Copy the permission bits from src to dst. The file contents, owner, and group are unaffected. src and dst are path names given as strings. :Arguments: src - mode of the file to be copied dst - file on which mode has to be copied :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.copymode(src, dst) print_info("mode of src {} copied to dst {} successfully". format(src, dst)) status = True except Exception as e: print_error("copying file mode from {} to file {} raised exception {}". format(src, dst, str(e))) return status
[docs]def copystat(src, dst): """ Copy the permission bits, last access time, last modification time, and flags from src to dst. The file contents, owner, and group are unaffected. src and dst are path names given as strings. :Arguments: src - permission bits of the file to be copied dst - file on which permission bits has to be copied :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.copystat(src, dst) print_info("metadata of src {} copied to dst {} successfully". format(src, dst)) status = True except Exception as e: print_error("copying file stat from {} to file {} raised exception {}". format(src, dst, str(e))) return status
[docs]def copy(src, dst): """ Copy the file src to the file or directory dst. If dst is a directory, a file with the same basename as src is created (or overwritten) in the directory specified. Permission bits are copied. src and dst are path names given as strings. :Arguments: src - file to be copied dst - file/dir on which to be copied :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.copy(src, dst) print_info("src {} copied to dst {} successfully".format(src, dst)) status = True except Exception as e: print_error("copying file {} to file {} raised exception {}". format(src, dst, str(e))) return status
[docs]def copy2(src, dst): """ Similar to shutil.copy(), but metadata (permissions etc., as mentioned in copy_stat above) is copied as well in fact, this is just shutil.copy() followed by copystat(). This is similar to the Unix command cp -p. :Arguments: src - file and metadata to be copied dst - file/dir on which to be copied :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.copy2(src, dst) print_info("src {} copied to dst {} successfully along with metadata". format(src, dst)) status = True except Exception as e: print_error("copying file {} with metadata to file {} raised exception" " {}".format(src, dst, str(e))) return status
[docs]def move(src, dst): """ Recursively move a file or directory (src) to another location (dst). If the destination is an existing directory, then src is moved inside that directory. If the destination already exists but is not a directory, it may be overwritten depending on os.rename() semantics.If the destination is on the current filesystem, then os.rename() is used. Otherwise, src is copied (using shutil.copy2()) to dst and then removed. :Arguments: src - source file to be moved dst - target file/directory on which to be moved :Return: True/False - based on the success/failure of the operation """ status = False try: shutil.move(src, dst) print_info("move of src {} to dst {} successful".format(src, dst)) status = True except Exception as e: print_error("moving file {} to file {} raised exception {}". format(src, dst, str(e))) return status
[docs]def remove(nfile): """ removes the file from the filesystem :Arguments: nfile - filepath to be removed :Return: True/False - based on the success/failure of the operation """ status = False try: os.remove(nfile) print_info(nfile+" removed from filesystem") status = True except Exception as e: print_error("removing file {} raised exception {}". format(nfile, str(e))) return status
[docs]def recursive_findfile(file_name, src_dir): """ Finds the file_name in the given directory. :Arguments: 1. file_name(string) - name of the file(with extension) to be searched 2. src_dir(string) - path of the dir where the file will be searched :Return: 1. output_path(string) - Path of the file(from src_dir) with extension if the file exists else False """ output_path = False if dirExists(src_dir): for root, _, files in os.walk(src_dir): for f in files: if f == file_name: output = os.path.join(root, f) return output else: print_warning("Directory does not exist in the provided " "path: {}".format(src_dir)) return output_path
[docs]def get_modified_files(src_dir, time_stamp, filetypes=""): """ Finds the modified files(with any one of the filetypes) after a given time_stamp in the src_dir. If filetypes argument is empty, all modified files will be included. :Arguments: 1. src_dir(string) - path of the directory where the files will be 2. time_stamp(int/float) - time stamp value. Ex. time.time() will return current system time in seconds 3. filetypes(sting) - comma separated file types. Ex. ".py, .xml" :Return: 1. modified_files(list) - list of files modified """ modified_files = [] filetypes = tuple([ft.strip() for ft in filetypes.split(',') if ft]) for dirname, _, files in os.walk(src_dir): for fname in files: full_path = os.path.join(dirname, fname) # os.path.getmtime(full_path) mtime = os.stat(full_path).st_mtime if mtime > time_stamp: if filetypes: if full_path.lower().endswith(filetypes): modified_files.append(full_path) else: modified_files.append(full_path) return modified_files
[docs]def convert_to_zip(file_path, compression_type=zipfile.ZIP_DEFLATED): """ Compress the given file based on the compression_type using zipfile module. Name of the zipped file will be same as the given file name :Arguments: 1. file_path - File to be zipped 2. compression_type - ZIP_STORED(no compression) or ZIP_DEFLATED(requires zlib) :Returns: 1. zipped_file - zipped file """ zipped_file = os.path.splitext(file_path)[0] + ".zip" zip_object = zipfile.ZipFile(zipped_file, 'w', compression_type) zip_object.write(file_path, os.path.basename(file_path)) zip_object.close() return zipped_file