Python如何实现Paramiko的二次封装

作者:王瑞 时间:2023-07-21 14:46:47 

Paramiko是一个用于执行SSH命令的Python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考。

目前本人巡检百台设备完全无压力,如果要巡检过千台则需要多线程的支持,过万台则需要加入智能判断等。

实现命令执行: 直接使用过程化封装,执行CMD命令.


import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def BatchCMD(address,username,password,port,command):
 try:
   ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
   stdin , stdout , stderr = ssh.exec_command(command)
   result = stdout.read()
   if len(result) != 0:
     result = str(result).replace("\\n", "\n")
     result = result.replace("b'", "").replace("'", "")
     return result
   else:
     return None
 except Exception:
   return None

 实现磁盘巡检: 获取磁盘空间并返回字典格式


def GetAllDiskSpace(address,username,password,port):
 ref_dict = {}
 cmd_dict = {"Linux\n" : "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
       "AIX\n" : "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
       }
 # 首先检测系统版本
 os_version = BatchCMD(address,username,password,port,"uname")
 for version,run_cmd in cmd_dict.items():
   if(version == os_version):
     # 根据不同版本选择不同的命令
     os_ref = BatchCMD(address,username,password,port,run_cmd)
     ref_list= os_ref.split("\n")
     # 循环将其转换为字典
     for each in ref_list:
       # 判断最后是否为空,过滤最后一项
       if each != "":
         ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
 return ref_dict

# 磁盘巡检总函数
def DiskMain():
 with open("db.json", "r", encoding="utf-8") as read_fp:
   load_json = read_fp.read()
   js = json.loads(load_json)
   base = js.get("base")
   count = len(base)

for each in range(0,count):
     print("\033[37m-\033[0m" * 80)
     print("\033[35m 检测地址: {0:10} \t 用户名: {1:10} \t 密码: {2:10} \t 端口: {3:4}\033[0m".
       format(base[each][1],base[each][2],base[each][3],base[each][4]))
     print("\033[37m-\033[0m" * 80)

ref = GetAllDiskSpace(base[each][1],base[each][2],base[each][3],base[each][4])
     for k,v in ref.items():
       # 判断是否存在空盘
       if( v.split("%")[0] != "-"):
         # 将占用百分比转换为整数
         space_ret = int(v.split("%")[0])
         if space_ret >= 70:
           print("\033[31m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k,v))
           continue
         if space_ret >= 50:
           print("\033[33m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k, v))
           continue
         else:
           print("\033[34m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k, v))
           continue
     print()

# 组内传递用户名密码时调用此方法
def GroupDiskMain(address,username,password,port):
 ref = GetAllDiskSpace(address,username,password,port)
 for k, v in ref.items():
   if (v.split("%")[0] != "-"):
     space_ret = int(v.split("%")[0])
     if space_ret >= 70:
       print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警告]".format(k, v))
       continue
     if space_ret >= 50:
       print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警惕]".format(k, v))
       continue
     else:
       print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [正常]".format(k, v))
       continue
 print()

获取系统内存利用率: 获取系统内存利用率


def GetAllMemSpace(address,username,password,port):
 cmd_dict = {"Linux\n" : "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
       "AIX\n" : "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
       }
 # 首先检测系统版本
 os_version = BatchCMD(address,username,password,port,"uname")
 for version,run_cmd in cmd_dict.items():
   if(version == os_version):
     # 根据不同版本选择不同的命令
     os_ref = BatchCMD(address,username,password,port,run_cmd)

# 首先现将KB转化为MB
     mem_total = math.ceil( int(os_ref.split(":")[0].replace("\n","")) / 1024)
     mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024)
     mem_used = str( int(mem_total) - int(mem_free))

# 计算占用空间百分比
     percentage = 100 - int(mem_free / int(mem_total / 100))

print("内存总计空间: {}".format(str(mem_total) + " MB"))
     print("内存剩余空间: {}".format(str(mem_free) + " MB"))
     print("内存已用空间: {}".format(str(mem_used) + " MB"))
     print("计算百分比: {}".format(str(percentage) + " %"))

获取系统进程信息: 获取系统进程信息,并返回字典格式


def GetAllProcessSpace(address,username,password,port):
 ref_dict = {}
 cmd_dict = {"Linux\n" : "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq",
       "AIX\n" : "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
       }
 os_version = BatchCMD(address,username,password,port,"uname")
 for version,run_cmd in cmd_dict.items():
   if(version == os_version):
     os_ref = BatchCMD(address, username, password, port, run_cmd)
     ref_list = os_ref.split("\n")
     for each in ref_list:
       if each != "":
         ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
 return ref_dict

# 巡检进程是否存在
def ProcessMain():
 with open("db.json", "r", encoding="utf-8") as read_fp:
   load_json = read_fp.read()
   js = json.loads(load_json)

process = js.get("process")
   process_count = len(process)
   for x in range(0,process_count):
     # 根据process中的值查询base中的账号密码
     base = js.get("base")
     if( list(process[x].keys())[0] == base[x][0] ):
       # 拿到账号密码之后再提取出他们的进程ID于进程名
       print("\033[37m-\033[0m" * 80)
       print("\033[35m 检测地址: {0:10} \t 用户名: {1:10} \t 密码: {2:10} \t 端口: {3:4}\033[0m".
          format(base[x][1], base[x][2], base[x][3], base[x][4]))
       print("\033[37m-\033[0m" * 80)

ref_dic = GetAllProcessSpace(base[x][1],base[x][2],base[x][3],base[x][4])
       # ref_val = 全部进程列表 proc_val = 需要检测的进程列表
       ref_val = list(ref_dic.values())
       proc_val = list(process[x].values())[0]
       # 循环比较是否在列表中
       for each in proc_val:
         flag = each in ref_val
         if(flag == True):
           print("\033[34m 进程: {0:50} 状态: √ \033[0m".format(each))
         else:
           print("\033[31m 进程: {0:50} 状态: × \033[0m".format(each))

实现剧本运行功能: 针对特定一台主机运行剧本功能,随便写的一个版本,仅供参考


def RunRule(address,username,password,port,playbook):
 os_version = BatchCMD(address,username,password,port,"uname")
 if(os_version == list(playbook.keys())[0]):
   play = list(playbook.values())[0]
   print()
   print("\033[37m-\033[0m" * 130)
   print("\033[35m 系统类型: {0:4} \t 地址: {1:10} \t 用户名: {2:10} \t 密码: {3:15} \t 端口: {4:4}\033[0m"
      .format(os_version.replace("\n",""),address,username,password,port))
   print("\033[37m-\033[0m" * 130)

for each in range(0,len(play)):
     RunCmd = play[each] + " > /dev/null 2>&1 && echo $?"
     print("\033[30m [>] 派发命令: {0:100} \t 状态: {1:5} \033[0m".format(
       RunCmd.replace(" > /dev/null 2>&1 && echo $?", ""),"正在派发"))

os_ref = BatchCMD(address, username, password, port, RunCmd)
     if(os_ref == "0\n"):
       print("\033[34m [√] 运行命令: {0:100} \t 状态: {1:5} \033[0m".format(
         RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发完成"))
     else:
       print("\033[31m [×] 运行命令: {0:100} \t 状态: {1:5} \033[0m".format(
         RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发失败"))
       # 既然失败了,就把剩下的也打出来吧,按照失败处理
       for x in range(each+1,len(play)):
         print("\033[31m [×] 运行命令: {0:100} \t 状态: {1:5} \033[0m".format(
           play[x].replace(" > /dev/null 2>&1 && echo $?", ""), "终止执行"))
       break
 else:
   return 0

# 批量: 传入主机组不同主机执行不同剧本
def RunPlayBook(HostList,PlayBook):
 count = len(HostList)
 error = []
 success = []
 for each in range(0,count):
   ref = RunRule(HostList[each][0],HostList[each][1],HostList[each][2],HostList[each][3],PlayBook)
   if ref == 0:
     error.append(HostList[each][0])
   else:
     success.append(HostList[each][0])
 print("\n\n")
 print("-" * 130)
 print("执行清单")
 print("-" * 130)
 for each in success:
   print("成功主机: {}".format(each))
 for each in error:
   print("失败主机: {}".format(each))

# 运行测试
def PlayBookRun():
 playbook = \
   {
     "Linux\n":
       [
         "ifconfig",
         "vmstat",
         "ls",
         "netstat -an",
         "ifconfis",
         "cat /etc/passwd | grep 'root' | awk '{print $2}'"
       ]
   }

addr_list = \
   [
     ["192.168.1.127", "root", "1233", "22"],
     ["192.168.1.126", "root", "1203", "22"]
   ]

# 指定addr_list这几台机器执行playbook剧本
 RunPlayBook(addr_list,playbook)

过程化实现文件上传下载: 文件传输功能 PUT上传 GET下载


def BatchSFTP(address,username,password,port,soruce,target,flag):
 transport = paramiko.Transport((address, int(port)))
 transport.connect(username=username, password=password)
 sftp = paramiko.SFTPClient.from_transport(transport)
 if flag == "PUT":
   try:
     ret = sftp.put(soruce, target)
     if ret !="":
       transport.close()
       return 1
     else:
       transport.close()
       return 0
     transport.close()
   except Exception:
     transport.close()
     return 0
 elif flag == "GET":
   try:
     target = str(address + "_" + target)
     os.chdir("./recv_file")
     ret = sftp.get(soruce, target)
     if ret != "":
       transport.close()
       return 1
     else:
       transport.close()
       return 0
     transport.close()
   except Exception:
     transport.close()
     return 0

# 批量将本地文件 source 上传到目标 target 中
def PutRemoteFile(source,target):
 with open("db.json", "r", encoding="utf-8") as fp:
   load_json = fp.read()
   js = json.loads(load_json)
   base = js.get("base")

print("-" * 130)
   print("接收主机 \t\t 登录用户 \t 登录密码 \t 登录端口 \t 本地文件 \t\t 传输到 \t\t\t 传输状态")
   print("-" * 130)

for each in range(0,len(base)):
     # 先判断主机是否可通信
     ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4],"uname")
     if ref == None:
       print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未连通\033[0m".format(
         base[each][1],base[each][2],base[each][3],base[each][4],source,target))
       continue

ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"PUT")
     if(ref == 1):
       print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输成功\033[0m".format(
         base[each][1],base[each][2],base[each][3],base[each][4],source,target))
     else:
       print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输失败\033[0m".format(
         base[each][1], base[each][2], base[each][3], base[each][4], source, target))

# 批量将目标文件拉取到本地特定目录(存在缺陷)
def GetRemoteFile(source,target):
 with open("db.json", "r", encoding="utf-8") as fp:
   load_json = fp.read()
   js = json.loads(load_json)
   base = js.get("base")

print("-" * 130)
   print("发送主机 \t\t 登录用户 \t 登录密码 \t 登录端口 \t\t 远程文件 \t\t 拉取到 \t\t\t 传输状态")
   print("-" * 130)

for each in range(0,len(base)):
     ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4], "uname")
     if ref == None:
       print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未连通\033[0m".format(
         base[each][1], base[each][2], base[each][3], base[each][4], source, target))
       continue

ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"GET")
     if(ref == 1):
       print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输成功\033[0m".format(
         base[each][1],base[each][2],base[each][3],base[each][4],source,target))
     else:
       print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输失败\033[0m".format(
         base[each][1], base[each][2], base[each][3], base[each][4], source, target))

另一种命令执行方法:


import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def BatchCMD(address,username,password,port,command):
 try:
   ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
   stdin , stdout , stderr = ssh.exec_command(command)
   result = stdout.read()
   if len(result) != 0:
     return result
   else:
     return -1
 except Exception:
   return -1

# 通过获取主机Ping状态
def GetPing():
 fp = open("unix_base.db", "r", encoding="utf-8")
 count = len(open("unix_base.db", "r", encoding="utf-8").readlines())
 print("-" * 100)
 print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".format("IP地址","机器系统","设备SN","机房位置","存活状态","主机作用"))
 print("-" * 100)
 for each in range(count):
   ref = eval(fp.readline())
   ret = BatchCMD(ref[0],ref[5],ref[6],22,"pwd | echo $?")
   if(int(ret)==0):
     print("{0:20} \t {1:10} \t {2:11} \t {3:5} \t {4:9} \t {5:40}".
        format(ref[0],ref[1],ref[2],ref[3],"正常",ref[4]))
   else:
     print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".
        format(ref[0],ref[1],ref[2],ref[3],"异常",ref[4]))
 fp.close()

# ps aux | grep "usbCfgDev" | grep -v "grep" | awk {'print $2'}
def GetProcessStatus():
 fp = open("unix_process.db", "r", encoding="utf-8")
 count = len(open("unix_process.db", "r", encoding="utf-8").readlines())
 for each in range(count):
   proc = eval(fp.readline())
   proc_len = len(proc)
   print("-" * 70)
   print("---> 巡检地址: {0:10} \t 登录用户: {1:7} \t 登录密码: {2:10}".format(proc[0],proc[1],proc[2]))
   print("-" * 70)
   for process in range(3, proc_len):
     command = "ps aux | grep \'{}\' | grep -v \'grep\' | awk '{}' | head -1".format(proc[process],"{print $2}")
     try:
       ref = BatchCMD(proc[0],proc[1],proc[2],22,command)
       if(int(ref)!=-1):
         print("进程: {0:18} \t PID: {1:10} \t 状态: {2}".format(proc[process], int(ref),"√"))
       else:
         print("进程: {0:18} \t PID:{1:10} \t 状态: {2}".format(proc[process], 0,"×"))
     except Exception:
       print("进程: {0:18} \t PID:{1:10} \t 状态: {2}".format(proc[process], 0,"×"))
   print()
 fp.close()

def GetDiskStatus():
 fp = open("unix_disk.db", "r", encoding="utf-8")
 count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
 for each in range(count):
   proc = eval(fp.readline())
   proc_len = len(proc)
   print("-" * 100)
   print("---> 巡检地址: {0:10} \t 登录系统: {1:7} \t 登录账号: {2:10} 登录密码: {3:10}".
      format(proc[0],proc[1],proc[2],proc[3]))
   print("-" * 100)
   try:
     ref = BatchCMD(proc[0], proc[2], proc[3], 22, "df | grep -v 'Filesystem'")
     st = str(ref).replace("\\n", "\n")
     print(st.replace("b'", "").replace("'", ""))
   except Exception:
     pass
   print()
 fp.close()

# 运行命令
def RunCmd(command,system):
 fp = open("unix_disk.db", "r", encoding="utf-8")
 count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
 for each in range(count):
   proc = eval(fp.readline())
   proc_len = len(proc)

if proc[1] == system:
     print("-" * 100)
     print("---> 巡检地址: {0:10} \t 登录系统: {1:7} \t 登录账号: {2:10} 登录密码: {3:10}".
        format(proc[0],proc[1],proc[2],proc[3]))
     print("-" * 100)
     try:
       ref = BatchCMD(proc[0], proc[2], proc[3], 22, command)
       st = str(ref).replace("\\n", "\n")
       print(st.replace("b'", "").replace("'", ""))
     except Exception:
       pass
 fp.close()

面向对象的封装方法: 使用面向对象封装,可极大的提高复用性。


import paramiko

class MySSH:
 def __init__(self,address,username,password,default_port = 22):
   self.address = address
   self.default_port = default_port
   self.username = username
   self.password = password

self.obj = paramiko.SSHClient()
   self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
   self.obj.connect(self.address,self.default_port,self.username,self.password)
   self.objsftp = self.obj.open_sftp()

def BatchCMD(self,command):
   stdin , stdout , stderr = self.obj.exec_command(command)
   result = stdout.read()
   if len(result) != 0:
     result = str(result).replace("\\n", "\n")
     result = result.replace("b'", "").replace("'", "")
     return result
   else:
     return None

def GetRemoteFile(self,remotepath,localpath):
   self.objsftp.get(remotepath,localpath)

def PutLocalFile(self,localpath,remotepath):
   self.objsftp.put(localpath,remotepath)

def GetFileSize(self,file_path):
   ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
   return ref

def CloseSSH(self):
   self.objsftp.close()
   self.obj.close()

if __name__ == '__main__':
 ssh = MySSH('192.168.191.3','root','1233',22)

ref = ssh.BatchCMD("ifconfig")
 print(ref)

sz = ssh.GetFileSize("/etc/passwd")
 print(sz)
 ssh.CloseSSH()
第二次封装完善。

import paramiko,os,json,re

class MySSH:
 def __init__(self,address,username,password,default_port = 22):
   self.address = address
   self.default_port = default_port
   self.username = username
   self.password = password
   try:
     self.obj = paramiko.SSHClient()
     self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())

self.obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False)
     self.objsftp = self.obj.open_sftp()
   except Exception:
     pass

def BatchCMD(self,command):
   try:
     stdin , stdout , stderr = self.obj.exec_command(command,timeout=3)
     result = stdout.read()
     if len(result) != 0:
       result = str(result).replace("\\n", "\n")
       result = result.replace("b'", "").replace("'", "")
       return result
     else:
       return None
   except Exception:
     return None

def GetRemoteFile(self,remote_path,local_path):
   try:
     self.objsftp.get(remote_path,local_path)
     return True
   except Exception:
     return False

def PutLocalFile(self,localpath,remotepath):
   try:
     self.objsftp.put(localpath,remotepath)
     return True
   except Exception:
     return False

def CloseSSH(self):
   self.objsftp.close()
   self.obj.close()

# 获取文件大小
 def GetFileSize(self,file_path):
   ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
   return ref.replace("\n","")
 # 判断文件是否存在
 def IsFile(self,file_path):
   return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

通过Eval函数解析执行: 自定义语法规则与函数,通过Eval函数实现解析执行. 没写完,仅供参考。


import json,os,sys,math
import argparse,time,re
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def BatchCMD(address,username,password,port,command):
 try:
   ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
   stdin , stdout , stderr = ssh.exec_command(command)
   result = stdout.read()
   if len(result) != 0:
     result = str(result).replace("\\n", "\n")
     result = result.replace("b'", "").replace("'", "")
     return result
   else:
     return None
 except Exception:
   return None

# ------------------------------------------------------------------------
# 内置解析方法

def GetDisk(x):
 return str(x)

def GetCPULoad():
 return str(10)

# 句式解析器,解析句子并执行
def judge(string):
 # 如果匹配到IF则执行判断条件解析
 if re.findall(r'IF{ (.*?) }', string, re.M) != []:
   # 则继续提取出他的表达式
   ptr = re.compile(r'IF[{] (.*?) [}]',re.S)
   subject = re.findall(ptr,string)[0]
   subject_list = subject.split(" ")
   # 拼接语句并执行
   sentence = eval(subject_list[0]) + subject_list[1] + subject_list[2]
   # 组合后执行,返回结果
   if eval(sentence):
     return "IF"
   else:
     return False

# 如果匹配到put则执行上传解析
 elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:
   print("put")
 return False

# 获取特定目录下所有的剧本
def GetAllRule():
 rootdir = os.getcwd() + "\\rule\\"
 all_files = [f for f in os.listdir(rootdir)]
 print("-" * 90)
 print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))
 print("-" * 90)
 for switch in all_files:
   # 首先判断文件结尾是否为Json
   if( switch.endswith(".json") == True):
     all_switch_dir = rootdir + switch
     try:
       # 判断文件内部是否符合JSON规范
       with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
         # 判断是否存在指定字段来识别规范
         load = json.loads(read_file.read())
         if load.get("framework") != None and load.get("task_sequence") != None:
           run_addr_count = len(load.get("address_list"))
           command_count = len(load.get("task_sequence"))
           print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}".
              format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
     except ValueError:
       pass

# 指定一个剧本并运行
def RunPlayBook(rule_name):
 rootdir = os.getcwd() + "\\rule\\"
 all_files = [f for f in os.listdir(rootdir)]
 for switch in all_files:
   if( switch.endswith(".json") == True):
     all_switch_dir = rootdir + switch
     # 寻找到需要加载的剧本地址
     if( switch == rule_name):
       with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
         data = json.loads(read_file.read())
         address_list = data.get("address_list")
         # 循环每个主机任务
         for each in address_list:
           # 得到剧本内容
           task_sequence = data.get("task_sequence")
           default_port = data.get("default_port")
           print("-" * 90)
           print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))
           print("-" * 90)
           for task in task_sequence:
             flag = judge(task[0])
             if flag == "IF":
               ref = BatchCMD(each[0],each[1],each[2],default_port,task[1])
               print(ref)
             elif flag == False:
               ref = BatchCMD(each[0],each[1],each[2],default_port,task[0])
               print(ref)

if __name__ == "__main__":
 RunPlayBook("get_log.json")

定义剧本规范如下。


{
"framework": "Centos",
"version": "7.0",
"address_list":
[
 ["192.168.191.3","root","1233"]
],
"default_port": "22",
"task_sequence":
[
 ["ifconfig"],
 ["IF{ GetLastCmdFlag() == True }","uname"]
]
}

词法分析: 词法分析解析剧本内容。


# 获取特定目录下所有的剧本
def GetAllRule():
 rootdir = os.getcwd() + "\\rule\\"
 all_files = [f for f in os.listdir(rootdir)]
 print("-" * 90)
 print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))
 print("-" * 90)
 for switch in all_files:
   # 首先判断文件结尾是否为Json
   if( switch.endswith(".json") == True):
     all_switch_dir = rootdir + switch
     try:
       # 判断文件内部是否符合JSON规范
       with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
         # 判断是否存在指定字段来识别规范
         load = json.loads(read_file.read())
         if load.get("framework") != None and load.get("task_sequence") != None:
           run_addr_count = len(load.get("address_list"))
           command_count = len(load.get("task_sequence"))
           print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}".
              format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
     except ValueError:
       pass

# 句式解析器,解析句子并执行
def judge(string):
 # 如果匹配到IF则执行判断条件解析
 if re.findall(r'IF{ (.*?) }', string, re.M) != []:
   # 则继续提取出他的表达式
   ptr = re.compile(r'IF[{] (.*?) [}]',re.S)
   subject = re.findall(ptr,string)[0]
   subject_list = subject.split(" ")

# 公开接口,执行命令
   ssh = MySSH("192.168.191.3","root","1233","22")

# 组合命令并执行
   sentence = str(eval(subject_list[0]) + subject_list[1] + subject_list[2])
   if eval(sentence):
     return "IF",ssh
   else:
     return False

# 如果匹配到put则执行上传解析
 elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:
   print("put")
 return False

# 指定一个剧本并运行
def RunPlayBook(rule_name):
 rootdir = os.getcwd() + "\\rule\\"
 all_files = [f for f in os.listdir(rootdir)]
 for switch in all_files:
   if( switch.endswith(".json") == True):
     all_switch_dir = rootdir + switch
     # 寻找到需要加载的剧本地址
     if( switch == rule_name):
       with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
         data = json.loads(read_file.read())
         address_list = data.get("address_list")
         # 循环每个主机任务
         for each in address_list:
           # 得到剧本内容
           task_sequence = data.get("task_sequence")
           default_port = data.get("default_port")
           print("-" * 90)
           print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))
           print("-" * 90)
           for task in task_sequence:

flag,obj = judge(task[0])

if flag == "IF":
               ret = obj.BatchCMD(task[1])
               print(ret)
if __name__ == '__main__':
 ret = judge("IF{ ssh.GetFileSize('/etc/passwd') >= 4 }")
 print(ret)

MySSH类最终封装: 通过面向对象对其进行封装,实现了查询CPU,负载,内存利用率,磁盘容量,等通用数据的获取。


import paramiko, math,json

class MySSH:
 def __init__(self, address, username, password, default_port):
   self.address = address
   self.default_port = default_port
   self.username = username
   self.password = password
 # 初始化,远程模块
 def Init(self):
   try:
     self.ssh_obj = paramiko.SSHClient()
     self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
                allow_agent=False, look_for_keys=False)
     self.sftp_obj = self.ssh_obj.open_sftp()
   except Exception:
     return False
 # 执行非交互命令
 def BatchCMD(self, command):
   try:
     stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
     result = stdout.read()
     if len(result) != 0:
       result = str(result).replace("\\n", "\n")
       result = result.replace("b'", "").replace("'", "")
       return result
     else:
       return None
   except Exception:
     return None
 # 将远程文件下载到本地
 def GetRemoteFile(self, remote_path, local_path):
   try:
     self.sftp_obj.get(remote_path, local_path)
     return True
   except Exception:
     return False
 # 将本地文件上传到远程
 def PutLocalFile(self, localpath, remotepath):
   try:
     self.sftp_obj.put(localpath, remotepath)
     return True
   except Exception:
     return False
 # 关闭接口
 def CloseSSH(self):
   try:
     self.sftp_obj.close()
     self.ssh_obj.close()
   except Exception:
     pass
 # 获取文件大小
 def GetFileSize(self, file_path):
   ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
   return ref.replace("\n", "")
 # 判断文件是否存在
 def IsFile(self, file_path):
   return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))
 # 获取系统型号
 def GetSystemVersion(self):
   return self.BatchCMD("uname")
 # 检测目标主机存活状态
 def GetPing(self):
   try:
     if self.GetSystemVersion() != None:
       return True
     else:
       return False
   except Exception:
     return False
 # 获取文件列表,并得到大小
 def GetFileList(self, path):
   try:
     ref_list = []
     self.sftp_obj.chdir(path)
     file_list = self.sftp_obj.listdir("./")
     for sub_path in file_list:
       dict = {}
       file_size = self.GetFileSize(path + sub_path)
       dict[path + sub_path] = file_size
       ref_list.append(dict)
     return ref_list
   except Exception:
     return False
 # 将远程文件全部打包后拉取到本地
 def GetTarPackageAll(self, path):
   try:
     file_list = self.sftp_obj.listdir(path)
     self.sftp_obj.chdir(path)
     for packageName in file_list:
       self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))
       self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))
       self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))
       return True
   except Exception:
     return True
 # 获取磁盘空间并返回字典
 def GetAllDiskSpace(self):
   ref_dict = {}
   cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
         "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
         }
   try:
     os_version = self.GetSystemVersion()
     for version, run_cmd in cmd_dict.items():
       if (version == os_version):
         # 根据不同版本选择不同的命令
         os_ref = self.BatchCMD(run_cmd)
         ref_list = os_ref.split("\n")
         # 循环将其转换为字典
         for each in ref_list:
           # 判断最后是否为空,过滤最后一项
           if each != "":
             ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
     return ref_dict
   except Exception:
     return False
 # 获取系统内存利用率
 def GetAllMemSpace(self):
   cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
         "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
         }
   try:
     os_version = self.GetSystemVersion()
     for version, run_cmd in cmd_dict.items():
       if (version == os_version):
         # 根据不同版本选择不同的命令
         os_ref = self.BatchCMD(run_cmd)
         # 首先现将KB转化为MB
         mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024)
         mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024)

# 计算占用空间百分比
         percentage = 100 - int(mem_free / int(mem_total / 100))
         # 拼接字典数据
         return {"Total": str(mem_total), "Free": str(mem_free), "Percentage": str(percentage)}
   except Exception:
     return False
 # 获取系统进程信息,并返回字典格式
 def GetAllProcessSpace(self):
   ref_dict = {}
   cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq",
         "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
         }
   try:
     os_version = self.GetSystemVersion()
     for version, run_cmd in cmd_dict.items():
       if (version == os_version):
         os_ref = self.BatchCMD(run_cmd)
         ref_list = os_ref.split("\n")
         for each in ref_list:
           if each != "":
             ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
     return ref_dict
   except Exception:
     return False
 # 获取CPU利用率
 def GetCPUPercentage(self):
   ref_dict = {}
   cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'",
         "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
         }
   try:
     os_version = self.GetSystemVersion()
     for version, run_cmd in cmd_dict.items():
       if (version == os_version):
         os_ref = self.BatchCMD(run_cmd)
         ref_list = os_ref.split("\n")
         for each in ref_list:
           if each != "":
             each = each.split(":")
             ref_dict = {"us": each[0],"sys":each[1],"idea":each[2]}
     return ref_dict
   except Exception:
     return False
 # 获取机器的负载情况
 def GetLoadAVG(self):
   ref_dict = {}
   cmd_dict = {"Linux\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'",
         "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
         }
   try:
     os_version = self.GetSystemVersion()
     for version, run_cmd in cmd_dict.items():
       if (version == os_version):
         os_ref = self.BatchCMD(run_cmd)
         ref_list = os_ref.split("\n")
         for each in ref_list:
           if each != "":
             each = each.replace(",","").split(":")
             ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
             return ref_dict
     return False
   except Exception:
     return False
 # 修改当前用户密码
 def SetPasswd(self,username,password):
   try:
     os_id = self.BatchCMD("id | awk {'print $1'}")
     print(os_id)
     if(os_id == "uid=0(root)\n"):
       self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
       return True
   except Exception:
     return False

# 定义超类,集成基类MySSH
class SuperSSH(MySSH):
 def __init__(self,address, username, password, default_port):
   super(SuperSSH, self).__init__(address, username, password, default_port)

我们继续为上面的代码加上命令行,让其可以直接使用,这里需要遵循一定的格式规范,我们使用JSON解析数据,JSON格式如下.


{
"aix":
[
 ["192.168.1.1","root","123123"],
 ["192.168.1.1","root","2019"],
],
"suse":
[
 ["192.168.1.1","root","123123"],
],
"centos":
 [
 ["192.168.1.1","root","123123"],
 ]
}

接着是主程序代码,如下所示.


# -*- coding: utf-8 -*-
from MySSH import MySSH
import json,os,sys,argparse

class InitJson():
 def __init__(self,db):
   self.db_name = db
 def GetPlatform(self,plat):
   with open(self.db_name, "r", encoding="utf-8") as Read_Pointer:
     load_json = json.loads(Read_Pointer.read())
     for k,v in load_json.items():
       try:
         if k == plat:
           return v
       except Exception:
         return None
   return None

if __name__ == "__main__":
 ptr = InitJson("database.json")
 parser = argparse.ArgumentParser()

parser.add_argument("-G","--group",dest="group",help="指定主机组")
 parser.add_argument("-C","--cmd",dest="cmd",help="指定CMD命令")
 parser.add_argument("--get",dest="get",help="指定获取数据类型<ping>")
 parser.add_argument("--dst", dest="dst_file",help="目标位置")
 parser.add_argument("--src",dest="src_file",help="原文件路径")
 args = parser.parse_args()

# 批量CMD --group=aix --cmd=ls
 if args.group and args.cmd:
   platform = ptr.GetPlatform(args.group)
   success,error = [],[]
   for each in platform:
     ssh = MySSH(each[0], each[1], each[2], 22)
     if ssh.Init() != False:
       print("-" * 140)
       print("主机: {0:15} \t 账号: {1:10} \t 密码: {2:10} \t 命令: {3:30}".
          format(each[0], each[1], each[2], args.cmd))
       print("-" * 140)
       print(ssh.BatchCMD(args.cmd))
       ssh.CloseSSH()
       success.append(each[0])
     else:
       error.append(each[0])
       ssh.CloseSSH()
   print("\n\n","-" * 140,
      "\n 执行报告 \n",
      "-" * 140,
      "\n失败主机: {}\n".format(error),
      "-" * 140)

# 批量获取主机其他数据 --group=centos --get=ping
 if args.group and args.get:
     platform = ptr.GetPlatform(args.group)
     success, error = [], []
     for each in platform:
       ssh = MySSH(each[0], each[1], each[2], 22)
       # 判断是否为ping
       if ssh.Init() != False:
         if args.get == "ping":
           ret = ssh.GetPing()
           if ret == True:
             print("[*] 主机: {} 存活中.".format(each[0]))

# 收集磁盘数据
         elif args.get == "disk":
           print("-" * 140)
           print("主机: {0:15} \t 账号: {1:10} \t 密码: {2:10}".
              format(each[0], each[1], each[2]))
           print("-" * 140)

ret = ssh.GetAllDiskSpace()
           for k, v in ret.items():
             if (v.split("%")[0] != "-"):
               space_ret = int(v.split("%")[0])
               if space_ret >= 70:
                 print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警告]".format(k, v))
                 continue
               if space_ret >= 50:
                 print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警惕]".format(k, v))
                 continue
               else:
                 print("磁盘分区: {0:30} \t 磁盘占用: {1:5}".format(k, v))
                 continue
           print()
       else:
         error.append(each[0])
         ssh.CloseSSH()
     print("\n\n", "-" * 140,
        "\n 执行报告 \n",
        "-" * 140,
        "\n失败主机: {}\n".format(error),
        "-" * 140)

# 实现文件上传过程 --group=centos --src=./a.txt --dst=/tmp/test.txt
 if args.group and args.src_file and args.dst_file:
   platform = ptr.GetPlatform(args.group)
   success, error = [], []
   for each in platform:
     ssh = MySSH(each[0], each[1], each[2], 22)
     if ssh.Init() != False:
       ret = ssh.PutLocalFile(args.src_file,args.dst_file)
       if ret == True:
         print("主机: {} \t 本地文件: {} \t ---> 传到: {}".format(each[0], args.src_file,args.dst_file))
     else:
       error.append(each[0])
       ssh.CloseSSH()
   print("\n\n", "-" * 140,
      "\n 执行报告 \n",
      "-" * 140,
      "\n失败主机: {}\n".format(error),
      "-" * 140)

简单的使用命令:

远程CMD: python main.py --group=centos --cmd="free -h | grep -v 'total'"

Python如何实现Paramiko的二次封装

判断存活: python main.py --group=centos --get="ping"

Python如何实现Paramiko的二次封装

拉取磁盘:python main.py --group=suse --get="disk"

Python如何实现Paramiko的二次封装

批量上传文件: python main.py --group=suse --src="./aaa" --dst="/tmp/bbb.txt"

Python如何实现Paramiko的二次封装

由于我的设备少,所以没开多线程,担心开多线程对目标造成过大压力,也没啥必要。

番外: 另外我研究了一个主机分组的小工具,加上命令执行代码量才800行,实现了一个分组数据库,在这里记下使用方法。

默认运行进入一个交互式shell环境。

Init = 初始化json文件,ShowHostList=显示所有主机,ShowGroup=显示所有组,ShowAllGroup=显示所有主机包括组。

Python如何实现Paramiko的二次封装

添加修改与删除记录,命令如下。

Python如何实现Paramiko的二次封装

添加删除主机组。

Python如何实现Paramiko的二次封装

通过UUID向主机组中添加或删除主机记录。

Python如何实现Paramiko的二次封装

测试主机组连通性。

Python如何实现Paramiko的二次封装

来源:https://www.cnblogs.com/LyShark/p/14324971.html

标签:python,Paramiko,二次封装
0
投稿

猜你喜欢

  • PDO::lastInsertId讲解

    2023-06-11 14:31:37
  • Magic Photo Frame 神奇创意相框

    2009-09-15 20:45:00
  • PHP远程调试之XDEBUG

    2024-05-02 17:13:18
  • CentOS7服务器中apache、php7以及mysql5.7的安装配置代码

    2023-11-19 02:14:52
  • Numpy随机抽样的实现

    2022-06-24 07:18:02
  • 详解Go中Map类型和Slice类型的传递

    2024-04-23 09:47:03
  • 用XMlhttp生成html页面

    2007-08-29 19:49:00
  • 简单了解python shutil模块原理及使用方法

    2023-02-12 22:07:44
  • python 监控logcat关键字功能

    2022-06-13 13:36:17
  • 细线表格的处理

    2008-08-06 12:53:00
  • JavaScript队列函数和异步执行详解

    2024-04-22 13:26:21
  • python 实现在txt指定行追加文本的方法

    2021-09-08 22:14:40
  • 关于设计品质保证(DQA)的几点想法

    2007-11-16 16:55:00
  • Python request post上传文件常见要点

    2022-11-05 09:27:14
  • 基于java线程池读取单个SQL数据库表

    2024-01-25 08:17:00
  • Vue中使用webpack别名的方法实例详解

    2024-05-11 09:13:41
  • python掌握字符串只需这一篇就够了

    2023-01-09 03:39:10
  • 利用Python实现简单的Excel统计函数

    2021-09-27 09:21:09
  • 使用Python为中秋节绘制一块美味的月饼

    2023-06-30 17:36:10
  • TensorFlow自定义损失函数来预测商品销售量

    2023-01-08 07:01:51
  • asp之家 网络编程 m.aspxhome.com