Python封装zabbix-get接口的代码分享

作者:lyshark 时间:2021-12-05 08:57:39 

Zabbix 是一款强大的开源网管监控工具,该工具的客户端与服务端是分开的,我们可以直接使用自带的zabbix_get命令来实现拉取客户端上的各种数据,在本地组装参数并使用Popen开子线程执行该命令,即可实现批量监测。

封装Engine类: 该类的主要封装了Zabbix接口的调用,包括最基本的参数收集.

import subprocess,datetime,time,math

class Engine():
   def __init__(self,address,port):
       self.address = address
       self.port = port

def GetValue(self,key):
       try:
           command = "get.exe -s {0} -p {1} -k {2}".format(self.address,self.port,key).split(" ")
           start = datetime.datetime.now()
           process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
           while process.poll() is None:
               time.sleep(1)
               now = datetime.datetime.now()
               if (now - start).seconds > 2:
                   return 0
           return str(process.stdout.readlines()[0].split()[0],"utf-8")
       except Exception:
           return 0

# ping检测
   def GetPing(self):
       ref_dict = {"Address":0,"Ping":0}
       ref_dict["Address"] = self.address
       ref_dict["Ping"] = self.GetValue("agent.ping")
       if ref_dict["Ping"] == "1":
           return ref_dict
       else:
           ref_dict["Ping"] = "0"
           return ref_dict
       return ref_dict

# 获取主机组基本信息
   def GetSystem(self):
       ref_dict = { "Address" : 0 ,"HostName" : 0,"Uname":0 }
       ref_dict["Address"] = self.address
       ref_dict["HostName"] = self.GetValue("system.hostname")
       ref_dict["Uname"] = self.GetValue("system.uname")
       return ref_dict

# 获取CPU利用率
   def GetCPU(self):
       ref_dict = { "Address": 0 ,"Core": 0,"Active":0 , "Avg1": 0 ,"Avg5":0 , "Avg15":0 }
       ref_dict["Address"] = self.address
       ref_dict["Core"] = self.GetValue("system.cpu.num")
       ref_dict["Active"] = math.ceil(float(self.GetValue("system.cpu.util")))
       ref_dict["Avg1"] = self.GetValue("system.cpu.load[,avg1]")
       ref_dict["Avg5"] = self.GetValue("system.cpu.load[,avg5]")
       ref_dict["Avg15"] = self.GetValue("system.cpu.load[,avg15]")
       return ref_dict

# 获取内存利用率
   def GetMemory(self):
       ref_dict = { "Address":"0","Total":"0","Free":0,"Percentage":"0" }
       ref_dict["Address"] = self.address

fps = self.GetPing()
       if fps['Ping'] != "0":
           ref_dict["Total"] = self.GetValue("vm.memory.size[total]")
           ref_dict["Free"] = self.GetValue("vm.memory.size[free]")
           # 计算百分比: percentage = 100 - int(Free/int(Total/100))
           ref_dict["Percentage"] = str( 100 - int( int(ref_dict.get("Free")) / (int(ref_dict.get("Total"))/100)) ) + "%"
           return ref_dict
       else:
           return ref_dict

# 获取磁盘数据
   def GetDisk(self):
       ref_list = []

fps = self.GetPing()
       if fps['Ping'] != "0":
           disk_ = eval( self.GetValue("vfs.fs.discovery"))
           for x in range(len(disk_)):
               dict_ = {"Address": 0, "Name": 0, "Type": 0, "Free": 0}
               dict_["Address"] = self.address
               dict_["Name"] = disk_[x].get("{#FSNAME}")
               dict_["Type"] = disk_[x].get("{#FSTYPE}")
               if dict_["Type"] != "UNKNOWN":
                   pfree = self.GetValue("vfs.fs.size[\"{0}\",pfree]".format(dict_["Name"]))
                   dict_["Free"] = str(math.ceil(float(pfree)))
               else:
                   dict_["Free"] = -1
               ref_list.append(dict_)
           return ref_list
       return ref_list

# 获取进程状态
   def GetProcessStatus(self,process_name):
       fps = self.GetPing()
       dict_ = {"Address": '0', "ProcessName": '0', "ProcessCount": '0', "Status": '0'}
       if fps['Ping'] != "0":
           proc_id = self.GetValue("proc.num[\"{}\"]".format(process_name))
           dict_['Address'] = self.address
           dict_['ProcessName'] = process_name
           if proc_id != "0":
               dict_['ProcessCount'] = proc_id
               dict_['Status'] = "True"
           else:
               dict_['Status'] = "False"
           return dict_
       return dict_

# 获取端口开放状态
   def GetNetworkPort(self,port):
       dict_ = {"Address": '0', "Status": 'False'}
       dict_['Address'] = self.address
       fps = self.GetPing()
       if fps['Ping'] != "0":
           port_ = self.GetValue("net.tcp.listen[{}]".format(port))
           if port_ == "1":
               dict_['Status'] = "True"
           else:
               dict_['Status'] = "False"
           return dict_
       return dict_

# 检测Web服务器状态 通过本地地址:端口 => 检测目标地址:端口
   def CheckWebServerStatus(self,check_addr,check_port):
       dict_ = {"local_address": "0", "remote_address": "0", "remote_port": "0", "Status":"False"}
       fps = self.GetPing()
       dict_['local_address'] = self.address
       dict_['remote_address'] = check_addr
       dict_['remote_port'] = check_port
       if fps['Ping'] != "0":
           check_ = self.GetValue("net.tcp.port[\"{}\",\"{}\"]".format(check_addr,check_port))
           if check_ == "1":
               dict_['Status'] = "True"
           else:
               dict_['Status'] = "False"
           return dict_
       return dict_

当我们需要使用时,只需要定义变量调用即可,其调用代码如下。

from engine import Engine

if __name__ == "__main__":
   ptr_windows = Engine("127.0.0.1","10050")
   ret = ptr_windows.GetDisk()
   if len(ret) != 0:
       for item in ret:
           addr = item.get("Address")
           name = item.get("Name")
           type = item.get("Type")
           space = item.get("Free")
           if type != "UNKNOWN" and space != -1:
               print("地址: {} --> 盘符: {} --> 格式: {} --> 剩余空间: {}".format(addr,name,type,space))

来源:https://www.cnblogs.com/LyShark/p/16512849.html

标签:Python,zabbix-get
0
投稿

猜你喜欢

  • 用Python制作mini翻译器的实现示例

    2021-07-26 02:48:09
  • Python3模拟登录操作实例分析

    2022-01-18 13:36:26
  • Asp 操作Cookies(包括设置[赋值]、读取、删除[设置过期时间])

    2011-03-10 11:06:00
  • 推荐一款高效的python数据框处理工具Sidetable

    2022-07-22 04:34:25
  • 深入浅析Python获取对象信息的函数type()、isinstance()、dir()

    2021-12-23 11:46:11
  • Python去除字符串两端空格的方法

    2023-06-14 23:15:40
  • Python 两个列表的差集、并集和交集实现代码

    2021-12-26 18:11:01
  • Windows下Python3.6安装第三方模块的方法

    2022-07-18 19:08:23
  • python3.8+django2+celery5.2.7环境准备(python测试开发django)

    2022-08-19 06:17:14
  • 详解MySql自连接,外连接,内连接 ,左连接,右连接

    2024-01-25 05:40:52
  • ajax取消挂起请求的处理方法

    2023-11-20 23:41:53
  • python练习之曾经很火的小人画爱心表白代码

    2023-03-16 19:11:50
  • Python使用scapy模块发包收包

    2021-04-26 16:32:12
  • python3.6根据m3u8下载mp4视频

    2021-05-22 00:20:03
  • js replace()去除代码中空格的实例

    2024-04-29 13:36:26
  • 自定义Django Form中choicefield下拉菜单选取数据库内容实例

    2024-01-25 09:02:02
  • Python基于Twilio及腾讯云实现国际国内短信接口

    2021-05-28 22:38:51
  • vite+vue3中使用mock模拟数据问题

    2024-04-28 09:27:56
  • 在CMD中操作mysql数据库出现中文乱码解决方案

    2024-01-19 10:38:03
  • vue.js 自定义指令(拖拽、拖动、移动) 指令 v-drag详解

    2024-05-28 15:55:24
  • asp之家 网络编程 m.aspxhome.com