1
+ #!/bin/python
2
+ # encoding: utf-8
3
+ #================================================================================
4
+ #
5
+ # findJobKeys.py
6
+ #
7
+ # © Copyright IBM Corporation 2015-2018. All Rights Reserved
8
+ #
9
+ # This program is licensed under the terms of the Eclipse Public License
10
+ # v1.0 as published by the Eclipse Foundation and available at
11
+ # http://www.eclipse.org/legal/epl-v10.html
12
+ #
13
+ # U.S. Government Users Restricted Rights: Use, duplication or disclosure
14
+ # restricted by GSA ADP Schedule Contract with IBM Corp.
15
+ #
16
+ #================================================================================
17
+
18
+ import argparse
19
+ import sys
20
+ import os
21
+ from elasticsearch import Elasticsearch
22
+ from elasticsearch .serializer import JSONSerializer
23
+
24
+
25
+ TARGET_ENV = 'CAST_ELASTIC'
26
+
27
+ def main (args ):
28
+
29
+ # Specify the arguments.
30
+ parser = argparse .ArgumentParser (
31
+ description = '''A tool for finding keywords during the run time of a job.''' )
32
+
33
+ parser .add_argument ( '-a' , '--allocationid' , metavar = 'int' , dest = 'allocation_id' , default = - 1 ,
34
+ help = 'The allocation ID of the job.' )
35
+ parser .add_argument ( '-j' , '--jobid' , metavar = 'int' , dest = 'job_id' , default = - 1 ,
36
+ help = 'The job ID of the job.' )
37
+ parser .add_argument ( '-s' , '--jobidsecondary' , metavar = 'int' , dest = 'job_id_secondary' , default = 0 ,
38
+ help = 'The secondary job ID of the job (default : 0).' )
39
+ parser .add_argument ( '-t' , '--target' , metavar = 'hostname:port' , dest = 'target' , default = None ,
40
+ help = 'An Elasticsearch server to be queried. This defaults to the contents of environment variable "CAST_ELASTIC".' )
41
+ parser .add_argument ( '-k' , '--keywords' , metavar = 'key' , dest = 'keywords' , nargs = '*' , default = ['*' ],
42
+ help = 'A list of keywords to search for in the Big Data Store (default : *).' )
43
+ parser .add_argument ( '-H' , '--hostnames' , metavar = 'host' , dest = 'hosts' , nargs = '*' , default = None ,
44
+ help = 'A list of hostnames to filter the results to ' )
45
+
46
+ args = parser .parse_args ()
47
+
48
+ # If the target wasn't specified check the environment for the target value, printing help on failure.
49
+ if args .target == None :
50
+ if TARGET_ENV in os .environ :
51
+ args .target = os .environ [TARGET_ENV ]
52
+ else :
53
+ parser .print_help ()
54
+ print ("Missing target, '%s' was not set." % TARGET_ENV )
55
+ return 2
56
+
57
+
58
+ # Open a connection to the elastic cluster, if this fails is wrong on the server.
59
+ es = Elasticsearch (
60
+ args .target ,
61
+ sniff_on_start = True ,
62
+ sniff_on_connection_fail = True ,
63
+ sniffer_timeout = 60
64
+ )
65
+
66
+ # Build the query to get the time range.
67
+ should_query = '{{"query":{{"bool":{{"should":[{0}]}}}}}}'
68
+ match_clause = '{{"match":{{"{0}":{1}}}}}'
69
+
70
+ if args .allocation_id > 0 :
71
+ tr_query = should_query .format (
72
+ match_clause .format ("data.allocation_id" , args .allocation_id ))
73
+ else :
74
+ tr_query = should_query .format (
75
+ "{0},{1}" .format (
76
+ match_clause .format ("data.primary_job_id" , args .job_id ),
77
+ match_clause .format ("data.secondary_job_id" , args .job_id_secondary )))
78
+
79
+ # Execute the query on the cast-allocation index.
80
+ tr_res = es .search (
81
+ index = "cast-allocation" ,
82
+ body = tr_query
83
+ )
84
+ total_hits = tr_res ["hits" ]["total" ]
85
+
86
+ print ("Got {0} Hit(s) for specified job, searching for keywords." .format (total_hits ))
87
+ if total_hits != 1 :
88
+ print ("This implementation only supports queries where the hit count is equal to 1." )
89
+ return 3
90
+
91
+ # TODO make this code more fault tolerant
92
+ tr_data = tr_res ["hits" ]["hits" ][0 ]["_source" ]["data" ]
93
+
94
+ # ---------------------------------------------------------------------------------------------
95
+
96
+ # Build the hostnames string:
97
+ if args .hosts is None :
98
+ args .hosts = tr_data ["compute_nodes" ]
99
+ hostnames = "hostname:({0})" .format (" OR " .join (args .hosts ))
100
+
101
+ # ---------------------------------------------------------------------------------------------
102
+
103
+ # Determine the timerange:
104
+ start_time = '"{0}Z"' .format (tr_data ["begin_time" ])
105
+ # If a history is present end_time is end_time, otherwise it's now.
106
+ if "history" in tr_data :
107
+ end_time = '"{0}Z"' .format (tr_data ["history" ]["end_time" ])
108
+ else :
109
+ end_time = "*"
110
+ timerange = '''@timestamp:[{0} TO {1}]''' .format (start_time , end_time )
111
+
112
+ # ---------------------------------------------------------------------------------------------
113
+
114
+ # Build the message query.
115
+ message = "message:{0}" .format ("," .join (args .keywords ))
116
+
117
+ # ---------------------------------------------------------------------------------------------
118
+
119
+ # Submit the query, this is lucene syntax.
120
+ keyword_query = "{0} AND {1} AND {2}" .format (message , timerange , hostnames )
121
+ print keyword_query
122
+ key_res = es .search (
123
+ index = "_all" ,
124
+ q = keyword_query
125
+ )
126
+
127
+ print ("Got %d keyword hits." % key_res ['hits' ]['total' ])
128
+
129
+
130
+
131
+ if __name__ == "__main__" :
132
+ sys .exit (main (sys .argv ))
0 commit comments