Source code for Framework.ClassUtils.ssh_utils_class

'''
Copyright 2017, Fujitsu Network Communications, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

"""SSH utility module using the python Paramiko module"""
#!/usr/bin/env python

import os
import time
from time import sleep
from Framework.Utils import testcase_Utils, data_Utils
from Framework.Utils.print_Utils import print_info, print_error, print_debug, print_exception

[docs]class SSHComm(object): """SSHComm class which has methods related to SSH communication using paramiko""" def __init__(self, target, port=22, uid='', pid='', logfile=None): self.param = None self.import_param() self.sshobj = self.param.SSHClient() self.target = target self.uid = uid self.pid = pid self.logfile = logfile self.port = port
[docs] def import_param(self): """Import the paramiko module """ try: import paramiko except ImportError: print_error("Paramiko module is not installed"\ "Please install requests module to"\ "perform any activities related to SSH Communication") else: self.param = paramiko
[docs] def ssh_con(self, retries=1, interval=1, timeout=60, verify_keys=False, invoke_shell=False): """Connects to the host using ssh object :Arguments: 1. retries = No of attempts before returning fail 2. interval = Time to wait before the next retry 3. timeout = wait for response 4. verify_keys = Verify the host entry is available in host key :Returns: 1. status(bool)= True / False """ for attempt in xrange(retries): print_debug("Attempt{} connecting to {}".format(attempt+1, self.target)) try: if not verify_keys: self.sshobj.set_missing_host_key_policy(\ self.param.AutoAddPolicy()) self.sshobj.connect(self.target, self.port, self.uid, self.pid, timeout=timeout, look_for_keys=verify_keys) if invoke_shell: print_info("Opening shell for {}".format(self.sshobj)) self.sshobj.invoke_shell() if self.logfile is not None: self.log = open(self.logfile, 'w') except self.param.SSHException: print_error(" ! could not connect to %s...check logs" % self.target) return False except Exception, err: print_error("Login failed {0}".format(str(err))) sleep(interval) continue else: print_info("Connected to the host") return True return False
[docs] def ssh_close(self): """Close the SSH connection object :Returns: 1. status(bool)= True / False """ try: print_info("Closing SSH connection") self.sshobj.close() print_info("SSH Connection closed") except Exception: print_error("Error occured while closing ssh session") return False else: return True
[docs] def ssh_exec(self, command, timeout=None, get_pty=True): """Execution of the command on the remote host :Arguments: 1.command = command to be executed 2.timeout = wait for response 3.get_pty = request a pseudo-terminal :Returns: 1. out = command output 2. stderr = command's error stream object """ print_info("Executing command : %s" % command) out = "" stdin, stdout, stderr = self.sshobj.exec_command(command, timeout=timeout, get_pty=get_pty) stdin.close() if self.logfile is not None: self.log.write("*"*20) self.log.write("\n") self.log.write("Executing command :%s\n" % command) self.log.write("Output:\n") for line in iter(lambda: stdout.readline(2048), ""): print_info(line) out = out + line if self.logfile: self.log.write(line) self.log.flush() return out, stderr
[docs] def get_response(self, command, timeout=None): """Execute a command using the paramiko SSH object and returns the response string :Arguments: 1. command - commnad to be executed 2. timeout - wait for response :Returns: 1. status(bool)= True / False 2. out/err = response string """ out, stderr = self.ssh_exec(command, timeout) err = stderr.read() if out: return True, out else: print_error("Error:", err) return False, err
[docs] def verify_response(self, command, exp_string, fail_resp, timeout=None): """Execute a command using the paramiko SSH object and returns the response string :Arguments: 1. command - command to be executed 2. exp_string - String to be expected 3. fail_resp - Failure response to be expected 4. timeout - wait for response :Returns: 1. status(bool)= True / False 2. out/err = response string """ out, stderr = self.ssh_exec(command, timeout) err = stderr.read() if out and ((exp_string in out) and (fail_resp not in out)): print_info("Output:", out) return True, out elif out and ((exp_string not in out) or (fail_resp in out)): print_info("Error:", out) return False, out else: print_error("Error:", err) return False, err
[docs] def copy_file(self, remotepath, localpath, filename): """Get the file from remotepath :Argument: 1. remotepath - File path in the remote server 2. localpath - Local path to save the file 3. filename - Filename of the file to be tranferred :Returns: 1. status(bool)= True / False """ sftpobj = self.sshobj.open_sftp() print_info("Copying {} from {} to {}".format(filename, remotepath, localpath)) if filename: remotepath = os.path.join(remotepath, filename) localpath = os.path.join(localpath, filename) sftpobj.get(remotepath, localpath) if os.path.isfile(localpath): return True else: return False
[docs] def send_testdata_cmds(self, testdatafile, **args): """ - Parses the testdata file and - gets the command details for rows marked execute=yes (or) - gets the command details for rows marked execute=yes and row=str_rownum (or) - gets the command details for rows marked execute=yes and title=strtitle - Sends the obtained commands to the paramiko session. - If the commands have verification attribute set, then verifies the verification text for presence/absence as defined in the respective found attribute in the testdatfile. :Arguments: 1. testdatafile = the xml file where command details are available 2. str_rownum = row number of testdata command block to be searched for in the testdata file 3. strtitle = title of testdata command block title to be searched for in the testdata file :Returns: 1. finalresult = boolean """ finalresult = True varconfigfile = args.get('varconfigfile', None) title = args.get('title', None) row = args.get('row', None) details_dict = data_Utils.get_command_details_from_testdata(testdatafile, varconfigfile, title=title, row=row) command_list = details_dict["command_list"] stepdesc = "Send the following commands: %s" %command_list testcase_Utils.pNote(stepdesc) if command_list == False: finalresult = False command_list = [] intsize = len(command_list) # Send Commands for i in range(0, intsize): command = details_dict["command_list"][i] startprompt = details_dict["startprompt_list"][i] endprompt = details_dict["endprompt_list"][i] cmd_timeout = details_dict["timeout_list"][i] retry = details_dict["retry_list"][i] retry_timer = details_dict["retry_timer_list"][i] retry_onmatch = details_dict["retry_onmatch_list"][i] retry_count = details_dict["retry_count_list"][i] sleeptime = details_dict["sleeptime_list"][i] verify_text_list = details_dict["verify_text_list"][i] verify_context_list = details_dict["verify_context_list"][i] sleep = {None: 0, "":0, "none":0}.get(str(sleeptime).lower(), str(sleeptime)) sleep = int(sleep) print("\n") print_debug(">>>") testcase_Utils.pNote("Command #{0}\t: {1}".format(str(i+1), command)) testcase_Utils.pNote("startprompt\t: {0}".format(startprompt)) testcase_Utils.pNote("endprompt\t: {0}".format(endprompt)) testcase_Utils.pNote("sleeptime\t: {0}".format(sleep)) result, response = self.get_response(command_list[i]) if result and result is not 'ERROR': if verify_text_list is not None: result = data_Utils.verify_cmd_response(verify_text_list, verify_context_list, command, response, varconfigfile) command_status = {True: "Pass", False:"Fail", "ERROR":"ERROR"}.get(result) print_info("Command status: {0}".format(command_status)) print_debug("<<<") if sleep > 0: testcase_Utils.pNote("Sleep time of '{0} seconds' requested post command execution".format(sleep)) time.sleep(sleep) if result == "ERROR" or finalresult == "ERROR": result = "ERROR" finalresult = "ERROR" finalresult = finalresult and result return finalresult
[docs] def download_remote_file_sftp(self, remotepath, localpath, filename, \ remotehost, username, password, port=22): """Downloads a remote file from the remote server to the localpath using SFTP :Arguments: 1. remotepath (string) = the remote path to download the file from remotehost 2. localpath (string) = the local path on the local host 3. filename (string) = name of the remote file to download 4. remotehost (string) = remote host 5. username (string) = remote login username 6. password (string) = remote login password :Returns: 1. status(bool) """ status = False transport = self.param.Transport((remotehost, port)) transport.connect(username=username, password=password) sftp = self.param.SFTPClient.from_transport(transport) if filename: try: sftp.get(remotepath + filename, localpath + filename) if os.path.isfile(localpath + filename): print_info("Downloading file from remote server successful") status = True else: print_info("Downloading file from remote server failed") except Exception as exception: print_exception(exception) finally: sftp.close() transport.close() return status
[docs] def invoke_shell(self, term='vt100'): """Opens a terminal for the SSH session :Argument: 1. term = Type of terminal, default:vt100 :Returns: True: if successful False: if unsuccessful """ try: print_info("Opening shell...") self.sshobj.invoke_shell(term=term) return True except self.param.SSHException: print_error("Invoke shell Unsuccessful") return False
[docs] def connect_target_via_host(self, target, user, auth, invoke_shell=False, log=None): """Forward the SSH connection to another target client :Argument: 1. target(string) - Name/ip target machine to be connected 2. user - username to connect 3. auth - password for the user 4. invoke_shell - open shell for passing commands :Returns: 1. target_session - Session object for target connection 2. status(bool)= True / False """ target_session = SSHComm(target, uid=user, pid=auth, logfile=log) status = False try: ssh_transport = self.sshobj.get_transport() channel = ssh_transport.open_channel("direct-tcpip", (target, 22), ('localhost', 0)) print_info("Connecting to target: {}".format(target)) target_session.sshobj.set_missing_host_key_policy(\ self.param.AutoAddPolicy()) target_session.sshobj.connect(target, port=22, username=user, password=auth, sock=channel) if target_session.logfile: target_session.log = open(self.logfile, 'w') print_info("Connection to target: {}successful".format(target)) if invoke_shell: print_info("Opening shell for {}".format(target)) target_session.invoke_shell() status = True except self.param.SSHException as exception: print_exception(exception) except Exception as exception: print_exception(exception) return target_session, status