@@ -984,6 +984,10 @@ def admf_influxdb_jobmon(firstTIS, limitTIS, siteDict, fsssDict):
984984 # cores, the tags of interest. #
985985 # ####################################################################### #
986986 URL_INFLUXDB = "https://monit-grafana.cern.ch/api/datasources/proxy/7731/query?db=monit_production_cmsjm&q=SELECT%%20SUM%%28wavg_count%%29%%20FROM%%20%%22long%%22.%%22condor_1d%%22%%20WHERE%%20%%22Status%%22%%20=%%20%%27Running%%27%%20AND%%20time%%20%%3E=%%20%ds%%20and%%20time%%20%%3C%%20%ds%%20GROUP%%20BY%%20%%22RequestCpus%%22%%2C%%20%%22Site%%22"
987+ # -------------------------------------------------------------------------
988+ # urllib.parse.unquote(URL_INFLUXDB % (123456789, 987654321))
989+ # 'https://monit-grafana.cern.ch/api/datasources/proxy/7731/query?db=monit_production_cmsjm&q=SELECT SUM(wavg_count) FROM "long"."condor_1d" WHERE "Status" = \'Running\' AND time >= 123456789s and time < 987654321s GROUP BY "RequestCpus", "Site"'
990+ # -------------------------------------------------------------------------
987991 HDR_GRAFANA = {'Authorization' : "Bearer eyJrIjoiZWRnWXc1bUZWS0kwbWExN011TGNTN2I2S1JpZFFtTWYiLCJuIjoiY21zLXNzYiIsImlkIjoxMX0=" , 'Content-Type' : "application/x-www-form-urlencoded; charset=UTF-8" , 'Accept' : "application/json" }
988992 #
989993 first15m = int ( firstTIS / 86400 ) * 96
@@ -1085,6 +1089,132 @@ def admf_influxdb_jobmon(firstTIS, limitTIS, siteDict, fsssDict):
10851089
10861090
10871091
1092+ def admf_grafana_jobmon (firstTIS , limitTIS , siteDict , fsssDict ):
1093+ """sum up CPU usage from MonIT/ElasticSearch and return a site list"""
1094+ # ####################################################################### #
1095+ # fetch summed up core usage times count during firstTIS and limitTIS #
1096+ # from MonIT/ElasticSearch and return a list of sites that provided #
1097+ # 100 cores or more of CPU during that period. #
1098+ # CMS job monitoring information in InfluxDB/ElasticSearch is aggregated #
1099+ # from HTCondor 12 minute job snapshots retaining tags. We thus #
1100+ # have to aggregate over the tags that are not of interest and sum #
1101+ # the product of number-of-cores and usage for each site. #
1102+ # ####################################################################### #
1103+ URL_GRAFANA = "https://monit-grafana.cern.ch/api/datasources/proxy/9475/_msearch"
1104+ HDR_GRAFANA = {'Authorization' : "Bearer eyJrIjoiZWRnWXc1bUZWS0kwbWExN011TGNTN2I2S1JpZFFtTWYiLCJuIjoiY21zLXNzYiIsImlkIjoxMX0=" , 'Content-Type' : "application/json; charset=UTF-8" , 'Accept' : "application/json" }
1105+ #
1106+ first15m = int ( firstTIS / 86400 ) * 96
1107+ limit15m = int ( limitTIS / 86400 ) * 96
1108+ if ( first15m >= limit15m ):
1109+ logging .critical ("Empty time interval for sites to provide computing" )
1110+ return []
1111+ #
1112+ logging .info ("Querying ElasticSearch about job core usage via Grafana" )
1113+ logging .log (15 , " between %s and %s" %
1114+ (time .strftime ("%Y-%m-%d" , time .gmtime (first15m * 900 )),
1115+ time .strftime ("%Y-%m-%d" , time .gmtime ((limit15m * 900 )- 1 ))))
1116+
1117+
1118+ # prepare Lucene ElasticSearch query:
1119+ # ===================================
1120+ queryString = ("\" search_type\" :\" query_then_fetch\" ,\" ignore_unavailabl" +
1121+ "e\" :true,\" index\" :[\" monit_prod_condor_agg_metric*\" ]}" +
1122+ "\n {\" query\" :{\" bool\" :{\" must\" :[{\" match_phrase\" :{\" d" +
1123+ "ata.Status\" :\" Running\" }}],\" filter\" :{\" range\" :{\" met" +
1124+ "adata.timestamp\" :{\" gte\" :%d,\" lt\" :%d,\" format\" :\" epo" +
1125+ "ch_second\" }}}}},\" size\" :0,\" aggs\" :{\" corehours_per_si" +
1126+ "te\" :{\" terms\" :{\" field\" :\" data.Site\" ,\" size\" :512}," +
1127+ "\" aggs\" :{\" corehours_of_entry\" :{\" sum\" :{\" script\" :{" +
1128+ "\" lang\" :\" painless\" ,\" source\" :\" doc['data.RequestCpus" +
1129+ "'].value * doc['data.wavg_count'].value\" }}}}}}}\n " ) % \
1130+ (first15m * 900 , limit15m * 900 )
1131+
1132+
1133+ # execute query and receive results from ElasticSearch:
1134+ # =====================================================
1135+ try :
1136+ requestObj = urllib .request .Request (URL_GRAFANA ,
1137+ data = queryString .encode ("utf-8" ),
1138+ headers = HDR_GRAFANA , method = "POST" )
1139+ with urllib .request .urlopen ( requestObj , timeout = 600 ) as responseObj :
1140+ urlCharset = responseObj .headers .get_content_charset ()
1141+ if urlCharset is None :
1142+ urlCharset = "utf-8"
1143+ myData = responseObj .read ().decode ( urlCharset )
1144+ del urlCharset
1145+ #
1146+ # sanity check:
1147+ if ( len (myData ) < 1024 ):
1148+ raise ValueError ("Job core usage data failed sanity check" )
1149+ #
1150+ # decode JSON:
1151+ myJson = json .loads ( myData )
1152+ del myData
1153+ #
1154+ except urllib .error .URLError as excptn :
1155+ logging .error ("Failed to query ElasticSearch via Grafana, %s" %
1156+ str (excptn ))
1157+ return []
1158+
1159+
1160+ # loop over results and integrate core usage by site:
1161+ # ===================================================
1162+ integrationDict = {}
1163+ for myRspns in myJson ['responses' ]:
1164+ for myBuckt in myRspns ['aggregations' ]['corehours_per_site' ]['buckets' ]:
1165+ try :
1166+ mySite = myBuckt ['key' ]
1167+ try :
1168+ myFacility = siteDict [ mySite ]
1169+ except KeyError :
1170+ continue
1171+ myFsss = myFacility + "___" + mySite
1172+ if ( myFsss not in fsssDict ):
1173+ myFsss = myFacility
1174+ if ( myFsss not in fsssDict ):
1175+ continue
1176+ myUsage = myBuckt ['corehours_of_entry' ]['value' ]
1177+ if ( myFsss in integrationDict ):
1178+ integrationDict [ myFsss ] += myUsage
1179+ else :
1180+ integrationDict [ myFsss ] = myUsage
1181+ except KeyError as excptn :
1182+ logging .warning ("Bad query result entry, skipping, %s" %
1183+ str (excptn ))
1184+ continue
1185+ ackSet = set ()
1186+ myTime = ( limit15m - first15m ) / 4
1187+ for myFsss in sorted ( integrationDict .keys (), reverse = True ):
1188+ myCPU = integrationDict [ myFsss ] / myTime
1189+ logging .log (25 , "Fsss %s provided %.1f CPU cores" % (myFsss , myCPU ))
1190+ if ( myCPU >= 100.0 ):
1191+ ackSet .add ( fsssDict [myFsss ] )
1192+ else :
1193+ fsssList = myFsss .split ("___" )
1194+ if ( len (fsssList ) == 3 ):
1195+ parentFsss = fsssList [0 ] + "___" + fsssList [1 ]
1196+ if ( parentFsss not in fsssDict ):
1197+ parentFsss = fsssList [0 ]
1198+ elif ( len (fsssList ) == 2 ):
1199+ parentFsss = fsssList [0 ]
1200+ else :
1201+ continue
1202+ if ( parentFsss not in fsssDict ):
1203+ continue
1204+ if ( parentFsss in integrationDict ):
1205+ integrationDict [ parentFsss ] += integrationDict [ myFsss ]
1206+ else :
1207+ integrationDict [ parentFsss ] = integrationDict [ myFsss ]
1208+
1209+
1210+ logging .info (" found %d fsss'es providing 100 cores or more" %
1211+ len (ackSet ))
1212+ #
1213+ return list ( ackSet )
1214+ # ########################################################################### #
1215+
1216+
1217+
10881218def admf_write_acknowledgement (quarterString , tupleList , filepath = None ):
10891219 """write computing acknowledgement LaTex file"""
10901220 # ####################################################################### #
@@ -1851,7 +1981,7 @@ def admf_make_tzlist():
18511981 #
18521982 # get list of sites contributing computing:
18531983 # =========================================
1854- compTuple = admf_influxdb_jobmon (frstDay , nextDay , siteDict , fsssDict )
1984+ compTuple = admf_grafana_jobmon (frstDay , nextDay , siteDict , fsssDict )
18551985 #
18561986 #
18571987 tupleList = sorted ( set ( diskTuple + compTuple ) )
0 commit comments