分类: Python/Ruby
2012-03-02 18:13:01
python snmp 自动化3-修改python的netsnmp库
#2012-03-02 磁针石
#承接软件自动化实施与培训 验证码破解 软件破解 脚本开发 测试和python培训等
#gtalk: ouyangchongwu#gmail.com qq 37391319 博客:testing.blog.chinaunix.net
#版权所有,转载刊登请来函联系
#python qq group: 深圳自动化测试python群:113938272
#武冈深圳qq群:66250781 都梁深圳湖南户外群:49494279
#参考资料下面我们对client.py进行修改,去掉其中的函数部分,并为Session增加一些方法。以实现更加方便的访问:
初始化时,增加3个字典:
self.tid2OidDict = pickle.load(open(r"/usr/local/mib/tid2OidDict.txt"))
self.tidAliasDict = pickle.load(open(r"/usr/local/mib/tidAliasDict.txt"))
self.tidTypeDict = pickle.load(open(r"/usr/local/mib/tidTypeDict.txt"))
增加列表转换方法:
def convert_args_to_list(self, args):
"""
args is a list of tuple, for example:
[
('ntpClientEnabled', '0', 'false'),
('cmEthernetNetPortOamEnabled', '1.1.1.1', 'false'),
('ecpaControlDuration', '1.1.1.1', '25')
]
"""
var_list = VarList()
index = ''
snmpValue = snmpType = None
for arg in args:
tid = arg[0]
snmpOid = self.get_oid_from_tid(tid)
length = len(arg)
if length >1:
index = arg[1]
#arg with tid index value or more
if len(arg) >2:
value = arg[2]
if tid in self.tidAliasDict:
snmpValue = self.convert_alias_to_value(tid,value)
else:
snmpValue = value
snmpType = self.get_type_from_tid(tid)
if len(arg) >3:
snmpType = arg[3]
var = Varbind(snmpOid,index,snmpValue,snmpType)
var_list.append(var)
return var_list
增加把别名转换为实际值的方法:
def convert_alias_to_value(self, tid,value):
if tid in self.tidAliasDict:
snmpValue = self.tidAliasDict[tid][value]
else:
snmpValue = value
return snmpValue
增加根据tid取得类型的方法:
def get_type_from_tid(self, tid):
return self.tidTypeDict[tid]
增加根据tid获取oid的方法:
def get_oid_from_tid(self, tid):
return self.tid2OidDict[tid]
其他函数也有少许修改。不一一列出:
这样,上次的创建和删除保护组就可以简化成:
import netsnmp
import ecpa
session = netsnmp.Session(Version=2,DestHost='172.23.192.44',Community='private')
resultList = session.set([
('cmFacProtGroupSwitchMode', '1.1.1.1','oneplusone'),
('cmFacProtGroupWorkPort', '1.1.1.1',session.get_oid_from_tid('cmEthernetNetPortIndex') + '.1.1.1.1'),
('cmFacProtGroupProtPort', '1.1.1.1',session.get_oid_from_tid('cmEthernetNetPortIndex') + '.1.1.1.2'),
('cmFacProtGroupRowStatus', '1.1.1.1','createAndGo')
])
print resultList
resultList = session.set([
('cmFacProtGroupRowStatus', '1.1.1.1','destroy')
])
print resultList
我们可以对具体的业务进行测试,定义如下的ecpa.py
"""
This file description ECPA operation in SNMP
"""
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Function: Ssh to remote server
# Author: Andrew Xu
# CreateDate: 2012/03/01
# $Id: $
# __version__ = $Revision: $
import netsnmp
class ecpaSnmp(object):
def __init__(self, snmpObj):
self.client = snmpObj
def set_ecpa_stream(self, index, **args):
keyDict = {
'index': 'ecpaConfigStreamIndex',
'name': 'ecpaConfigStreamName',
'size': 'ecpaConfigStreamFrameSize',
'rate': 'ecpaConfigStreamRate',
'payloadType': 'ecpaConfigStreamPayloadType',
'destinationMAC': 'ecpaConfigStreamDestinationMAC',
'sourceMAC': 'ecpaConfigStreamSourceMAC',
'outerVlanEnabled': 'ecpaConfigStreamOuterVlanEnabled',
'outerVlanId': 'ecpaConfigStreamOuterVlanId',
'outerVlanPrio': 'ecpaConfigStreamOuterVlanPrio',
'outerVlanEtherType': 'ecpaConfigStreamOuterVlanEtherType',
'innerVlanEnabled': 'ecpaConfigStreamInnerVlanEnabled',
'innerVlanId': 'ecpaConfigStreamInnerVlanId',
'innerVlanPrio': 'ecpaConfigStreamInnerVlanPrio',
'innerVlanEtherType': 'ecpaConfigStreamInnerVlanEtherType',
'ipVersion': 'ecpaConfigStreamIpVersion',
'ipV4Address': 'ecpaConfigStreamIpV4Address',
'ipV6Address': 'ecpaConfigStreamIpV6Address',
'prioMapMode': 'ecpaConfigStreamPrioMapMode',
'prioVal': 'ecpaConfigStreamPrioVal',
'innerVlan2Enabled': 'ecpaConfigStreamInnerVlan2Enabled',
'innerVlan2Id': 'ecpaConfigStreamInnerVlan2Id',
'innerVlan2Prio': 'ecpaConfigStreamInnerVlan2Prio',
'innerVlan2EtherType': 'ecpaConfigStreamInnerVlan2EtherType',
'destIpV4Address': 'ecpaConfigStreamDestIpV4Address',
'destIpV6Address': 'ecpaConfigStreamDestIpV6Address',
'usePortSourceMAC': 'ecpaConfigStreamUsePortSourceMAC'
}
setList = []
for arg in args:
snmpName = keyDict[arg]
snmpValue = args[arg]
print snmpName, index, snmpValue
setList.append((snmpName, index, snmpValue))
return self.client.set(setList)
就可以通过如下方式调用刚才的库:
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Function: Ssh to remote server
# Author: Andrew Xu
# CreateDate: 2012/02/28
import netsnmp
import ecpa
session = netsnmp.Session(Version=2,DestHost='172.23.192.44',Community='private')
#print session.tidAliasDict
ecpaExample = ecpa.ecpaSnmp(session)
resultList = ecpaExample.set_ecpa_stream(1,rate='96000',name='test',size='98')
print resultList
现在还存在的问题有:walk只返回值,没有oid,这些问题留到下周解决。walk的示例如下:
resultList = session.walk([
('cmEthernetAccPortEntry'),
])
返回的错误还没有捕捉。添加如下函数:
def get_snmp_error(self):
var_list = self.convert_args_to_list([('lastSetErrorInformation','0')])
res = client_intf.get(self, var_list)
return res
并对返回进行控制:
if res != 1:
return self.get_snmp_error()
else:
return res
现在执行出错时就会有错误报出:
# ./test.py
('Error: 257 - Entity already exists. (PROT GROUP-1-1-1-1)',)