Source code for Framework.ClassUtils.netconf

Copyright 2017, Fujitsu Network Communications, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
'''  NETCONF client library.
import socket
import traceback
import random
import os
import re
from threading import Thread, Event
from select import select
from binascii import hexlify
from xml.dom.minidom import parseString
from lxml import etree
from datetime import datetime
import paramiko

from Framework.Utils.testcase_Utils import pNote

BUF_SIZE = 65536
NETCONF_DELIM = "]]>]]>"
XML_HEADER = "<?xml version='1.0' encoding='utf-8'?>"
NETCONF_BASE_NS = "urn:ietf:params:xml:ns:netconf:base:1.0"
NETCONF_NTFCN_NS = "urn:ietf:params:xml:ns:netconf:notification:1.0"
NETCONF_GETSCHEMA_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring"
NETCONF_DELIM_11 = "\n##\n"

[docs]def connect(host, port, username, password, hostkey_verify=False, protocol_version=""): ''' #creates client instance and returns it if connection success ''' netconf_obj = client() if netconf_obj.connect(host, port, username, password, hostkey_verify, protocol_version): return netconf_obj return None
[docs]class client(Thread): ''' #netconf client class ''' def __init__(self): ''' initialize ''' self.__t = None self.__chan = None self.__sock = None self.__temp_buf = "" Thread.__init__(self) self.setDaemon(True) self.__host_keys = paramiko.HostKeys() self.__wait_resp = Event() self.__wait_resp.clear() self.__wait_rept = Event() self.__wait_rept.clear() self.__response_buffer = "" self.__hello_buffer = "" self.__session_id = None self.__isCOMPLD = False self.__error_message = "" self.__isOpen = False self.__send_data = "" self.__protocol_ver = "1.0" self.__notification_list = [] self.__wait_string = ("", {}) self.__host_name = "" self.__exp_protocol_version = "" self.__notification_list_print = []
[docs] def connect(self, host, port, username, password, hostkey_verify=False, protocol_version=""): ''' #ssh connect # host = hostname or ip (string) # port = port number (integer) # username = login user name (string) # password = password (string) # hostkey_verify = True/False(default) # protocol_version = 1.0/1.1/null (string) ''' pNote("netconf: Connecting to " + host + ":" + str(port)) # connect try: # self.__sock = socket.socket(socket.AF_INET, # socket.SOCK_STREAM, # socket.getprotobyname('tcp')) # self.__sock.settimeout(30) # self.__sock.connect((host, port)) self.__sock = socket.create_connection((host, port), 60) except (socket.error, socket.herror, socket.gaierror, socket.timeout): pNote("netconf: Connection failed", "error") traceback.print_exc() return False # self.__sock.settimeout(None) try: self.__t = paramiko.Transport(self.__sock) try: self.__t.start_client() except paramiko.SSHException: pNote("netconf: SSH negotiation failed", "error") traceback.print_exc() return False except paramiko.SSHException: pNote("netconf: Connection failed", "error") traceback.print_exc() return False # hostkey verify server_key = self.__t.get_remote_server_key() # fingerprint = self.__colonify(hexlify(server_key.get_fingerprint())) if hostkey_verify: self.__load_known_hosts() known_host = self.__host_keys.check(host, server_key) if not known_host: known_host = self.__host_keys.check('[%s]:%s' %(host, port), server_key) if not known_host: pNote("netconf: unknown host", "warning") try: self.__t.auth_password(username, password) except paramiko.AuthenticationException: pNote("netconf: Authentication failed", "error") self.__t.close() return False if not self.__t.is_authenticated(): pNote("netconf: Authentication failed", "error") self.__t.close() return False self.__chan = self.__t.open_session() self.__chan.set_name("netconf") self.__chan.invoke_subsystem("netconf") self.__host_name = host self.__isOpen = True self.__exp_protocol_version = protocol_version # recv thread start self.start() # send hello pdu self.send_hello() return True
def __colonify(self, fp): ''' #format fingerprint with ":" ''' finga = fp[:2] for idx in range(2, len(fp), 2): finga += ":" + fp[idx:idx + 2] return finga def __load_known_hosts(self, filename=None): ''' #load ssh known_hosts setting form a file ''' if filename is None: filename = os.path.expanduser('~/.ssh/known_hosts') try: self.__host_keys.load(filename) except IOError: # for windows filename = os.path.expanduser('~/ssh/known_hosts') try: self.__host_keys.load(filename) except IOError: pass else: self.__host_keys.load(filename)
[docs] def run(self): ''' #start receiving thread ''' pNote("netconf: start receiving thread") self.__receive() pNote("netconf: receiving thread terminated")
def __send(self, data): ''' #send data to host # data = data to send (xml string) ''' ret = True if self.__isOpen: self.__send_data = data self.__response_buffer = "" # self.__notification_buffer = "" # dispdata = data.replace("\n", "") dispdata = re.sub("> +<", "><", dispdata) pNote("netconf send: \n" + \ parseString(dispdata).toprettyxml( indent=" ", encoding="utf-8")) # try: if data.endswith("\n"): data = data[:-1] if self.__protocol_ver == "1.0": data += NETCONF_DELIM elif self.__protocol_ver == "1.1": chunksize = len(data) data = "\n#" + str(chunksize) + "\n" + data data += NETCONF_DELIM_11 while data: n = self.__chan.send(data) if n <= 0: pNote("netconf: send data failed.", "error") self.__error_message = "send data failed" ret = False break data = data[n:] except socket.error as e: pNote(str(e.__class__) + ': ' + str(e), "error") self.__error_message = str(e) traceback.print_exc() ret = False else: pNote("netconf: port not opened", "warning") self.__error_message = "port not opened" ret = False return ret def __receive(self): ''' #receiving thread #if data=rpc-reply, stores in response_buffer, wait_resp flag set, #if rpc-error, isCOMPLD=False, otherwise isCOMPLD=True, #if data=notification, stores in notification_list, #if data=hello, stores in hello_buffer ''' try: while True: if self.__protocol_ver == "1.0": temp_delim = NETCONF_DELIM else: temp_delim = NETCONF_DELIM_11 xml_len = self.__temp_buf.find(temp_delim) if xml_len >= 0: recv_data = self.__temp_buf[:xml_len] if self.__protocol_ver == "1.1": recv_data = re.sub("\n#[0-9].+\n", "", recv_data) self.__temp_buf = self.__temp_buf[xml_len + len(temp_delim):] try: recv_dom = parseString(recv_data) except Exception as e: pNote(str(e), "error") pNote(recv_data) pNote("\nreceived xml is invalid, closing port.\n", "error") self.close() return False resType = recv_dom.documentElement.tagName if resType == "rpc-reply": if len(recv_dom.getElementsByTagName("rpc-error")) == 0: self.__isCOMPLD = True self.__error_message = "" else: self.__isCOMPLD = False sev = recv_dom.getElementsByTagName( "error-severity")[0].childNodes[0].data etyp = recv_dom.getElementsByTagName( "error-type")[0].childNodes[0].data etag = recv_dom.getElementsByTagName( "error-tag")[0].childNodes[0].data if len(recv_dom.getElementsByTagName("error-message")) != 0: # msg = recv_dom.getElementsByTagName("error-message")[0].childNodes[0].data msg = "" if len(recv_dom.getElementsByTagName( "error-message")[0].childNodes) != 0: msg = recv_dom.getElementsByTagName( "error-message")[0].childNodes[0].data else: msg = "" if len(recv_dom.getElementsByTagName("bad-element")) != 0: bele = "" if len(recv_dom.getElementsByTagName( "bad-element")[0].childNodes) != 0: bele = recv_dom.getElementsByTagName( "bad-element")[0].childNodes[0].data else: bele = "" self.__error_message = "%s:%s:%s:%s:%s" % ( sev, etyp, etag, msg, bele) self.__response_buffer += recv_data self.__wait_resp.set() elif resType == "notification": pNote("\n[NETCONF Notification %s from %s]\n%s" % (, self.__host_name, recv_data)) self.__notification_list.append(recv_data) self.__notification_list_print.append(recv_data) elif resType == "hello": self.__hello_buffer = recv_data pNote(recv_data) cap = recv_dom.getElementsByTagName("capability") for c in cap: if c.childNodes[0].data == "urn:ietf:params:netconf:base:1.1": self.__protocol_ver = "1.1" break if self.__exp_protocol_version: if self.__protocol_ver != self.__exp_protocol_version: self.__protocol_ver = "1.0" else: self.__protocol_ver = self.__exp_protocol_version sid = recv_dom.getElementsByTagName( "session-id")[0].childNodes[0].data if sid: self.__session_id = sid else: pNote("session-id could not be found", "error") self.close() return False else: # unknown data type pNote("netconf: unknown type:%s" %resType, "warning") rlist, wlist, xlist = select( [self.__chan], [], [], POLL_INTERVAL) if rlist: data = self.__chan.recv(BUF_SIZE) if data: self.__temp_buf += str(data) else: # in case of something unexpected happens if len(self.__temp_buf) > 0: pNote(self.__temp_buf) self.__error_message = "port closed" self.__wait_resp.set() self.close() return False if len(self.__wait_string) != 0 and self.__wait_string[0]: waitstr = self.__wait_string for notification in self.__notification_list: pNote("Checking notification: " "##{}##".format(notification)) match = False xml = etree.fromstring(notification) temp = xml.xpath(waitstr[0], namespaces=waitstr[1]) if isinstance(temp, bool) and temp: match = True elif isinstance(temp, list) and len(temp) > 0: match = True if match: self.__wait_rept.set() self.__notification_list.remove(notification) break except Exception as e: pNote(str(e), "error") self.__error_message = str(e) traceback.print_exc() self.close() return False return True
[docs] def close(self): ''' #session close ''' try: self.__chan.close() self.__t.close() except Exception as e: pNote(str(e), "error") traceback.print_exc() self.__isOpen = False pNote("netconf: port closed") return True
def __wait_recv_data(self): ''' #wait receive for rpc-reply until timeout expires ''' self.__wait_resp.clear() self.__wait_resp.wait(TIMEOUT_VALUE) if self.__wait_resp.isSet(): if not self.__isOpen: return False else: return True else: pNote("netconf: RESPONSE TIMEOUT", "warning") self.__error_message = "response timeout" return False
[docs] def send_hello(self): ''' #send hello # just send, no wait ''' xml = XML_HEADER xml += "<hello xmlns='%s'>" % NETCONF_BASE_NS xml += "<capabilities>" xml += "<capability>urn:ietf:params:netconf:base:1.0</capability>" if self.__exp_protocol_version != "1.0": xml += "<capability>urn:ietf:params:netconf:base:1.1</capability>" xml += "</capabilities>" xml += "</hello>" return self.__send(xml)
[docs] def rpc(self, xml): ''' #send a rpc xml = xml string to send returns: response data ''' data = XML_HEADER data += "<rpc message-id = '%s' xmlns='%s'>" % (random.randint(1, 1000), NETCONF_BASE_NS) data += xml data += "</rpc>" if self.__send(data): self.__wait_recv_data() return self.__response_buffer
[docs] def get_config(self, source, filter_string=None, filter_type="subtree"): ''' #send get-config rpc source = datastore name (candidate/running/startup) filter_string = filter string, xml string or xpath string filter_type = filter type (subtree or xpath) ''' xml = "" xml += "<get-config>" xml += "<source><%s/></source>" % source if filter_string: if filter_type == "subtree": xml += "<filter type='subtree'>%s</filter>" % filter_string elif filter_type == "xpath": if "'" in filter_string: xml += "<filter type='xpath' select=\"%s\"/>" % filter_string else: xml += "<filter type='xpath' select='%s'/>" % filter_string xml += "</get-config>" return self.rpc(xml)
[docs] def edit_config(self, target, config_string, default_operation=None, test_option=None, error_option=None): ''' #send edit-config rpc target = datastore name (candidate/running) configString = xml string default_operation = merge(default)/replace/none test_option = test-then-set(default)/set/test-only error_option = stop-on-error(default)/continue-on-error/rollback-on-error ''' xml = "" xml += "<edit-config>" xml += "<target><%s/></target>" % target if default_operation: xml += "<default-operation>%s</default-operation>" % default_operation if test_option: xml += "<test-option>%s</test-option>" % test_option if error_option: xml += "<error-option>%s</error-option>" % error_option if config_string.startswith("<config>"): xml += config_string else: xml += "<config>%s</config>" % config_string xml += "</edit-config>" return self.rpc(xml)
[docs] def copy_config(self, target, source): ''' #send copy-config rpc target = destination datastore source = source datastore ''' xml = "" xml += "<copy-config>" xml += "<target><%s/></target>" % target if source.find("config") != -1: xml += "<source>%s</source>" % source else: xml += "<source><%s/></source>" % source xml += "</copy-config>" return self.rpc(xml)
[docs] def delete_config(self, target): ''' #send delete-config rpc target = target datastore ''' xml = "" xml += "<delete-config>" xml += "<target><%s/></target>" % target xml += "</delete-config>" return self.rpc(xml)
[docs] def lock(self, target): ''' #send lock rpc target = target datastore ''' xml = "" xml += "<lock>" xml += "<target><%s/></target>" % target xml += "</lock>" return self.rpc(xml)
[docs] def unlock(self, target): ''' #send unlock rpc target = target datastore ''' xml = "" xml += "<unlock>" xml += "<target><%s/></target>" % target xml += "</unlock>" return self.rpc(xml)
[docs] def get(self, filter_string=None, filter_type=None): ''' #send get rpc filterString = filter string, xml string or xpath string ''' xml = "" xml += "<get>" if filter_string: if filter_type == "subtree": xml += "<filter type='subtree'>%s</filter>" % filter_string elif filter_type == "xpath": if "'" in filter_string: xml += "<filter type='xpath' select=\"%s\"/>" % filter_string else: xml += "<filter type='xpath' select='%s'/>" % filter_string else: xml += "%s" %filter_string xml += "</get>" return self.rpc(xml)
[docs] def close_session(self): ''' #send close-session rpc ''' xml = "" xml += "<close-session/>" return self.rpc(xml)
[docs] def kill_session(self, session_id): ''' #send kill-session rpc session_id = session-id to be killed ! not current session ! ''' xml = "" xml += "<kill-session>" xml += "<session-id>%s</session-id>" % session_id xml += "</kill-session>" return self.rpc(xml)
[docs] def commit(self, confirmed=None, confirm_timeout=None, persist=None, persist_id=None): ''' #send commit rpc confirmed = any string if using confirmed commit confirm_timeout = timeout value in sec. when confirmed commit persist = string of persist-id persist_id = persist-id if specified in previous commit ''' xml = "" xml += "<commit>" if confirmed: xml += "<confirmed/>" if confirm_timeout: xml += "<confirm-timeout>%s</confirm-timeout>" % confirm_timeout if persist: xml += "<persist>%s</persist>" % persist if persist_id: xml += "<persist-id>%s</persist-id>" % persist_id xml += "</commit>" return self.rpc(xml)
[docs] def cancel_commit(self, persist_id=None): ''' #send cancel-commit rpc persist_id = persist-id string ''' xml = "" xml += "<cancel-commit>" if persist_id: xml += "<persist-id>%s</persist-id>" % persist_id xml += "</cancel-commit>" return self.rpc(xml)
[docs] def discard_changes(self): ''' #send discard-changes rpc ''' xml = "" xml += "<discard-changes/>" return self.rpc(xml)
[docs] def validate(self, source="candidate", config=None): ''' #send validate rpc source = datastore ''' xml = "" xml += "<validate>" if source.find("config") != -1: xml += "<source><config>%s</config></source>" % config else: xml += "<source><%s/></source>" % source xml += "</validate>" return self.rpc(xml)
[docs] def create_subscription(self, stream_from=None, filter_string=None, filter_type=None, start_time=None, stop_time=None): ''' #send create-subscription rpc (RFC-5277) stream = stream name (NETCONF/SNMP/syslog...) filterString = xml string or xpath string filterType = subtree or xpath startTime = Start time to replay stopTime = Stop time to replay ''' xml = "" xml += "<create-subscription xmlns='%s'>" % NETCONF_NTFCN_NS if stream_from: xml += "<stream>%s</stream>" % stream_from if filter_string: if filter_type == "xpath": if "'" in filter_string: xml += "<filter type='xpath' select=\"%s\"/>" % filter_string else: xml += "<filter type='xpath' select='%s'/>" % filter_string elif filter_type == "subtree": xml += "<filter type='subtree'>%s</filter>" % filter_string xml += filter_string if start_time: xml += "<startTime>%s</startTime>" % start_time if stop_time: xml += "<stopTime>%s</stopTime>" % stop_time xml += "</create-subscription>" return self.rpc(xml)
[docs] def waitfor_subscription(self, wait_string, timeout=600): '''waitfor a notification event report :ARGUMENTS: wait_string(tuple) = tuple of xpath string and namespace dict( key - prefix value - namespace string) timeout(integer) = timeout in sec. :Returns: True: if successful False: if unsuccessful ''' status = False self.__wait_rept.clear() self.__wait_string = wait_string if len(self.__wait_string) != 0: self.__wait_rept.wait(timeout) if self.__wait_rept.isSet(): status = True # pNote("netconf: waitfor %s received" % wait_string[0]) else: pNote("netconf: waitfor timeouted:%s" % wait_string[0], "warning") self.__wait_string = ("", {}) return status
[docs] def clear_notification_buffer(self): ''' clear the notification buffer ''' self.__notification_list = [] return True
[docs] def clear_notification_print_buffer(self): ''' clear the notification buffer for print ''' self.__notification_list_print = [] return True
[docs] def get_schema(self, identifier, version_number=None, format_type=None): ''' get-schama rpc ''' xml = "" xml += "<get-schema xmlns='%s'>" % NETCONF_GETSCHEMA_NS xml += "<identifier>%s</identifier>" % identifier if version_number: xml += "<version>%s</version>" % version_number if format_type: xml += "<format>%s</format>" % format_type xml += "</get-schema>" return self.rpc(xml)
@property def session_id(self): ''' session-id ''' return self.__session_id @property def send_data(self): ''' previous send data ''' return self.__send_data @property def response_data(self): ''' netconf reply data ''' return self.__response_buffer @property def notification_data(self): ''' netconf notification data ''' # return self.__notification_buffer return self.__notification_list_print @property def capability_data(self): ''' netconf capabilities ''' return self.__hello_buffer @property def isCOMPLD(self): ''' whether rpc comannd gets ok ''' return self.__isCOMPLD @property def isOpen(self): ''' whether port is opened ''' return self.__isOpen @property def error_message(self): ''' error message ''' return self.__error_message @property def current_protocol_version(self): ''' protocol version ''' return self.__protocol_ver