18
18
19
19
package org .apache .hadoop .yarn .server .resourcemanager .metrics ;
20
20
21
+ import java .util .ArrayList ;
21
22
import java .util .HashMap ;
22
23
import java .util .Map ;
24
+ import java .util .concurrent .ExecutorService ;
25
+ import java .util .concurrent .Executors ;
26
+ import java .util .concurrent .LinkedBlockingQueue ;
27
+ import java .util .concurrent .TimeUnit ;
23
28
24
29
import org .slf4j .Logger ;
25
30
import org .slf4j .LoggerFactory ;
32
37
import org .apache .hadoop .yarn .api .records .timeline .TimelineEntity ;
33
38
import org .apache .hadoop .yarn .api .records .timeline .TimelineEvent ;
34
39
import org .apache .hadoop .yarn .client .api .TimelineClient ;
40
+ import org .apache .hadoop .yarn .conf .YarnConfiguration ;
35
41
import org .apache .hadoop .yarn .event .EventHandler ;
36
42
import org .apache .hadoop .yarn .server .metrics .AppAttemptMetricsConstants ;
37
43
import org .apache .hadoop .yarn .server .metrics .ApplicationMetricsConstants ;
@@ -59,16 +65,92 @@ public TimelineServiceV1Publisher() {
59
65
}
60
66
61
67
private TimelineClient client ;
68
+ private LinkedBlockingQueue <TimelineEntity > entityQueue ;
69
+ private ExecutorService sendEventThreadPool ;
70
+ private int dispatcherPoolSize ;
71
+ private int dispatcherBatchSize ;
72
+ private int putEventInterval ;
73
+ private boolean isTimeLineServerBatchEnabled ;
74
+ private volatile boolean stopped = false ;
75
+ private PutEventThread putEventThread ;
76
+ private Object sendEntityLock ;
62
77
63
78
@ Override
64
79
protected void serviceInit (Configuration conf ) throws Exception {
80
+ isTimeLineServerBatchEnabled =
81
+ conf .getBoolean (
82
+ YarnConfiguration .RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED ,
83
+ YarnConfiguration .DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED );
84
+ if (isTimeLineServerBatchEnabled ) {
85
+ putEventInterval =
86
+ conf .getInt (YarnConfiguration .RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL ,
87
+ YarnConfiguration .DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL )
88
+ * 1000 ;
89
+ if (putEventInterval <= 0 ) {
90
+ throw new IllegalArgumentException (
91
+ "RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL should be greater than 0" );
92
+ }
93
+ dispatcherPoolSize = conf .getInt (
94
+ YarnConfiguration .RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE ,
95
+ YarnConfiguration .
96
+ DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE );
97
+ if (dispatcherPoolSize <= 0 ) {
98
+ throw new IllegalArgumentException (
99
+ "RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE should be greater than 0" );
100
+ }
101
+ dispatcherBatchSize = conf .getInt (
102
+ YarnConfiguration .RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE ,
103
+ YarnConfiguration .
104
+ DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE );
105
+ if (dispatcherBatchSize <= 1 ) {
106
+ throw new IllegalArgumentException (
107
+ "RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE should be greater than 1" );
108
+ }
109
+ putEventThread = new PutEventThread ();
110
+ sendEventThreadPool = Executors .newFixedThreadPool (dispatcherPoolSize );
111
+ entityQueue = new LinkedBlockingQueue <>(dispatcherBatchSize + 1 );
112
+ sendEntityLock = new Object ();
113
+ LOG .info ("Timeline service v1 batch publishing enabled" );
114
+ } else {
115
+ LOG .info ("Timeline service v1 batch publishing disabled" );
116
+ }
65
117
client = TimelineClient .createTimelineClient ();
66
118
addIfService (client );
67
119
super .serviceInit (conf );
68
120
getDispatcher ().register (SystemMetricsEventType .class ,
69
121
new TimelineV1EventHandler ());
70
122
}
71
123
124
+ protected void serviceStart () throws Exception {
125
+ if (isTimeLineServerBatchEnabled ) {
126
+ stopped = false ;
127
+ putEventThread .start ();
128
+ }
129
+ super .serviceStart ();
130
+ }
131
+
132
+ protected void serviceStop () throws Exception {
133
+ super .serviceStop ();
134
+ if (isTimeLineServerBatchEnabled ) {
135
+ stopped = true ;
136
+ putEventThread .interrupt ();
137
+ try {
138
+ putEventThread .join ();
139
+ SendEntity task = new SendEntity ();
140
+ if (!task .buffer .isEmpty ()) {
141
+ LOG .info ("Initiating final putEntities, remaining entities left in entityQueue: {}" ,
142
+ task .buffer .size ());
143
+ sendEventThreadPool .submit (task );
144
+ }
145
+ } finally {
146
+ sendEventThreadPool .shutdown ();
147
+ if (!sendEventThreadPool .awaitTermination (3 , TimeUnit .SECONDS )) {
148
+ sendEventThreadPool .shutdownNow ();
149
+ }
150
+ }
151
+ }
152
+ }
153
+
72
154
@ SuppressWarnings ("unchecked" )
73
155
@ Override
74
156
public void appCreated (RMApp app , long createdTime ) {
@@ -257,7 +339,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt,
257
339
@ SuppressWarnings ("unchecked" )
258
340
@ Override
259
341
public void appAttemptFinished (RMAppAttempt appAttempt ,
260
- RMAppAttemptState appAttemtpState , RMApp app , long finishedTime ) {
342
+ RMAppAttemptState appAttemptState , RMApp app , long finishedTime ) {
261
343
TimelineEntity entity =
262
344
createAppAttemptEntity (appAttempt .getAppAttemptId ());
263
345
@@ -274,7 +356,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt,
274
356
eventInfo .put (AppAttemptMetricsConstants .FINAL_STATUS_INFO ,
275
357
app .getFinalApplicationStatus ().toString ());
276
358
eventInfo .put (AppAttemptMetricsConstants .STATE_INFO , RMServerUtils
277
- .createApplicationAttemptState (appAttemtpState ).toString ());
359
+ .createApplicationAttemptState (appAttemptState ).toString ());
278
360
if (appAttempt .getMasterContainer () != null ) {
279
361
eventInfo .put (AppAttemptMetricsConstants .MASTER_CONTAINER_INFO ,
280
362
appAttempt .getMasterContainer ().getId ().toString ());
@@ -374,23 +456,68 @@ private static TimelineEntity createContainerEntity(ContainerId containerId) {
374
456
}
375
457
376
458
private void putEntity (TimelineEntity entity ) {
377
- try {
459
+ if (isTimeLineServerBatchEnabled ) {
460
+ try {
461
+ entityQueue .put (entity );
462
+ if (entityQueue .size () > dispatcherBatchSize ) {
463
+ SendEntity task = null ;
464
+ synchronized (sendEntityLock ) {
465
+ if (entityQueue .size () > dispatcherBatchSize ) {
466
+ task = new SendEntity ();
467
+ }
468
+ }
469
+ if (task != null ) {
470
+ sendEventThreadPool .submit (task );
471
+ }
472
+ }
473
+ } catch (Exception e ) {
474
+ LOG .error ("Error when publishing entity batch [ " + entity .getEntityType () + ","
475
+ + entity .getEntityId () + " ] " , e );
476
+ }
477
+ } else {
478
+ try {
479
+ if (LOG .isDebugEnabled ()) {
480
+ LOG .debug ("Publishing the entity " + entity .getEntityId ()
481
+ + ", JSON-style content: "
482
+ + TimelineUtils .dumpTimelineRecordtoJSON (entity ));
483
+ }
484
+ client .putEntities (entity );
485
+ } catch (Exception e ) {
486
+ LOG .error ("Error when publishing entity [ " + entity .getEntityType () + ","
487
+ + entity .getEntityId () + " ] " , e );
488
+ }
489
+ }
490
+ }
491
+
492
+ private class SendEntity implements Runnable {
493
+
494
+ private ArrayList <TimelineEntity > buffer ;
495
+
496
+ SendEntity () {
497
+ buffer = new ArrayList ();
498
+ entityQueue .drainTo (buffer );
499
+ }
500
+
501
+ @ Override
502
+ public void run () {
378
503
if (LOG .isDebugEnabled ()) {
379
- LOG .debug ("Publishing the entity " + entity .getEntityId ()
380
- + ", JSON-style content: "
381
- + TimelineUtils .dumpTimelineRecordtoJSON (entity ));
504
+ LOG .debug ("Number of timeline entities being sent in batch: {}" , buffer .size ());
505
+ }
506
+ if (buffer .isEmpty ()) {
507
+ return ;
508
+ }
509
+ try {
510
+ client .putEntities (buffer .toArray (new TimelineEntity [0 ]));
511
+ } catch (Exception e ) {
512
+ LOG .error ("Error when publishing entity: " , e );
382
513
}
383
- client .putEntities (entity );
384
- } catch (Exception e ) {
385
- LOG .error ("Error when publishing entity [" + entity .getEntityType () + ","
386
- + entity .getEntityId () + "]" , e );
387
514
}
388
515
}
389
516
390
517
private class TimelineV1PublishEvent extends TimelinePublishEvent {
391
518
private TimelineEntity entity ;
392
519
393
- public TimelineV1PublishEvent (SystemMetricsEventType type ,
520
+ TimelineV1PublishEvent (SystemMetricsEventType type ,
394
521
TimelineEntity entity , ApplicationId appId ) {
395
522
super (type , appId );
396
523
this .entity = entity ;
@@ -408,4 +535,46 @@ public void handle(TimelineV1PublishEvent event) {
408
535
putEntity (event .getEntity ());
409
536
}
410
537
}
411
- }
538
+
539
+ private class PutEventThread extends Thread {
540
+ PutEventThread () {
541
+ super ("PutEventThread" );
542
+ }
543
+
544
+ @ Override
545
+ public void run () {
546
+ LOG .info ("System metrics publisher will put events every " +
547
+ String .valueOf (putEventInterval ) + " milliseconds" );
548
+ while (!stopped && !Thread .currentThread ().isInterrupted ()) {
549
+ if (System .currentTimeMillis () % putEventInterval >= 1000 ) {
550
+ try {
551
+ Thread .sleep (500 );
552
+ } catch (InterruptedException e ) {
553
+ LOG .warn (SystemMetricsPublisher .class .getName ()
554
+ + " is interrupted. Exiting." );
555
+ break ;
556
+ }
557
+ continue ;
558
+ }
559
+ SendEntity task = null ;
560
+ synchronized (sendEntityLock ) {
561
+ if (LOG .isDebugEnabled ()) {
562
+ LOG .debug ("Creating SendEntity task in PutEventThread" );
563
+ }
564
+ task = new SendEntity ();
565
+ }
566
+ if (task != null ) {
567
+ sendEventThreadPool .submit (task );
568
+ }
569
+ try {
570
+ // sleep added to avoid multiple SendEntity task within a single interval.
571
+ Thread .sleep (1000 );
572
+ } catch (InterruptedException e ) {
573
+ LOG .warn (SystemMetricsPublisher .class .getName ()
574
+ + " is interrupted. Exiting." );
575
+ break ;
576
+ }
577
+ }
578
+ }
579
+ }
580
+ }
0 commit comments