-
Notifications
You must be signed in to change notification settings - Fork 2
/
check_soap
116 lines (109 loc) · 6.79 KB
/
check_soap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/python3.5
# coding: utf-8
#########################################################################################
# Clever Net Systems [~] #
# Clément Hampaï <[email protected]> #
# Centreon check_soap plugin #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/>. #
#########################################################################################
import requests, argparse, time, sys
from requests.auth import HTTPBasicAuth
start_time = float()
end_time = float()
# Args management --------------------------------------------------------------
def handle_args():
parser = argparse.ArgumentParser(description='[~] check_soap Centreon plugin')
parser.add_argument('--url', help='url of the soap service',required=True)
parser.add_argument('--data', help='xml data to post to the soap service',required=True)
parser.add_argument('--success', help='string to match in the service answer for a valide query',required=False)
parser.add_argument('--srv-cert', help='path to a CA_BUNDLE file or directory with certificates of trusted CAs',required=False)
parser.add_argument('--client-cert', help='local cert for client side certificate',required=False)
parser.add_argument('--client-cert-key', help='specify a local cert key for client side certificate',required=False)
parser.add_argument('--basic-auth-user', help='specify a basic authentication user',required=False)
parser.add_argument('--basic-auth-password', help='specify a basic authentication password',required=False)
parser.add_argument('--warning', help='exec time in [ms] for warning threshold',required=True)
parser.add_argument('--critical', help='exec time in [ms] for critical threshold',required=True)
args = parser.parse_args()
return args
#-------------------------------------------------------------------------------
# HTTP query -------------------------------------------------------------------
def post_query(args):
global end_time
try:
url=args.url
headers = {'content-type': 'text/xml'}
body = args.data
if args.srv_cert != None and args.client_cert != None and args.client_cert_key != None and args.basic_auth_user != None and args.basic_auth_password != None:
start_time = time.time()
response = requests.post(url,data=body,headers=headers, cert=(args.client_cert, args.client_cert_key), verify=args.srv_cert, auth=HTTPBasicAuth(args.basic_auth_user, args.basic_auth_password))
end_time = (time.time() - start_time)
elif args.srv_cert != None and args.client_cert != None and args.client_cert_key != None:
start_time = time.time()
response = requests.post(url,data=body,headers=headers, cert=(args.client_cert, args.client_cert_key), verify=args.srv_cert)
end_time = (time.time() - start_time)
elif args.client_cert != None and args.client_cert_key != None and args.basic_auth_user != None and args.basic_auth_password != None:
start_time = time.time()
response = requests.post(url,data=body,headers=headers, cert=(args.client_cert, args.client_cert_key), verify=False, auth=HTTPBasicAuth(args.basic_auth_user, args.basic_auth_password))
end_time = (time.time() - start_time)
elif args.client_cert != None and args.client_cert_key != None:
start_time = time.time()
response = requests.post(url,data=body,headers=headers, cert=(args.client_cert, args.client_cert_key), verify=False)
end_time = (time.time() - start_time)
elif args.srv_cert != None and args.basic_auth_user != None and args.basic_auth_password != None:
start_time = time.time()
response = requests.post(url,data=body,headers=headers, verify=args.srv_cert, auth=HTTPBasicAuth(args.basic_auth_user, args.basic_auth_password))
end_time = (time.time() - start_time)
elif args.srv_cert != None:
start_time = time.time()
response = requests.post(url,data=body,headers=headers, verify=args.srv_cert)
end_time = (time.time() - start_time)
elif args.basic_auth_user != None and args.basic_auth_password != None:
start_time = time.time()
response = requests.post(url,data=body,headers=headers, auth=HTTPBasicAuth(args.basic_auth_user, args.basic_auth_password))
end_time = (time.time() - start_time)
else:
start_time = time.time()
response = requests.post(url,data=body,headers=headers)
end_time = (time.time() - start_time)
return response
except:
print ("SOAP error - bad service url or query|"+display_graph(0, args, 0))
sys.exit(3)
#-------------------------------------------------------------------------------
# Arrange http query result ----------------------------------------------------
def check_result(args, response, time):
exectime = round(time,3)
if (args.success in response.text or args.success == None or args.success == "") and response.status_code == 200 and float(time) < float(args.warning) and float(time) < float(args.critical):
print (display_info("OK", exectime)+"|"+display_graph(exectime, args, 1))
sys.exit(0)
elif (args.success in response.text or args.success == None or args.success == "") and response.status_code == 200 and float(time) > float(args.critical):
print (display_info("critical", exectime)+"|"+display_graph(exectime, args, 0))
sys.exit(2)
elif (args.success in response.text or args.success == None or args.success == "") and response.status_code == 200 and float(time) > float(args.warning):
print (display_info("warning", exectime)+"|"+display_graph(exectime, args, 0))
sys.exit(1)
else:
print ("SOAP critical - bad service answer - http "+str(response.status_code)+"|"+display_graph(exectime, args, 0))
sys.exit(2)
def display_info(result_type, exectime):
return "SOAP "+str(result_type)+" - exec time "+str(exectime)+"[s]"
def display_graph(exectime, args, ok):
return "time="+str(exectime)+"s;"+str(args.warning)+";"+str(args.critical)+"; ok="+str(ok)
#-------------------------------------------------------------------------------
# MAIN -------------------------------------------------------------------------
args = handle_args()
rep = post_query(args)
check_result(args, rep, end_time)
# ------------------------------------------------------------------------------