2020import java .util .logging .Logger ;
2121
2222/**
23- * Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
23+ * Task hub worker that connects to a sidecar process over gRPC to execute
24+ * orchestrator and activity events.
2425 */
2526public final class DurableTaskGrpcWorker implements AutoCloseable {
2627
@@ -39,8 +40,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3940 private final TaskHubSidecarServiceBlockingStub sidecarClient ;
4041 private final boolean isExecutorServiceManaged ;
4142 private volatile boolean isNormalShutdown = false ;
42- private Thread processorThread ;
43-
43+ private Thread workerThread ;
44+
4445 DurableTaskGrpcWorker (DurableTaskGrpcWorkerBuilder builder ) {
4546 this .orchestrationFactories .putAll (builder .orchestrationFactories );
4647 this .activityFactories .putAll (builder .activityFactories );
@@ -67,46 +68,60 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6768
6869 this .sidecarClient = TaskHubSidecarServiceGrpc .newBlockingStub (sidecarGrpcChannel );
6970 this .dataConverter = builder .dataConverter != null ? builder .dataConverter : new JacksonDataConverter ();
70- this .maximumTimerInterval = builder .maximumTimerInterval != null ? builder .maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL ;
71+ this .maximumTimerInterval = builder .maximumTimerInterval != null ? builder .maximumTimerInterval
72+ : DEFAULT_MAXIMUM_TIMER_INTERVAL ;
7173 this .workerPool = builder .executorService != null ? builder .executorService : Executors .newCachedThreadPool ();
7274 this .isExecutorServiceManaged = builder .executorService == null ;
7375 }
7476
7577 /**
76- * Establishes a gRPC connection to the sidecar and starts processing work-items in the background.
78+ * Establishes a gRPC connection to the sidecar and starts processing work-items
79+ * in the background.
7780 * <p>
78- * This method retries continuously to establish a connection to the sidecar. If a connection fails,
79- * a warning log message will be written and a new connection attempt will be made. This process
80- * continues until either a connection succeeds or the process receives an interrupt signal.
81+ * This method retries continuously to establish a connection to the sidecar. If
82+ * a connection fails,
83+ * a warning log message will be written and a new connection attempt will be
84+ * made. This process
85+ * continues until either a connection succeeds or the process receives an
86+ * interrupt signal.
8187 */
8288 public void start () {
83- this .processorThread = new Thread (this ::startAndBlock );
84- this .processorThread .start ();
89+ this .workerThread = new Thread (this ::startAndBlock );
90+ this .workerThread .start ();
8591 }
8692
8793
8894 /**
89- * Closes the internally managed gRPC channel and executor service, if one exists.
95+ * Closes the internally managed gRPC channel and executor service, if one
96+ * exists.
9097 * <p>
91- * Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied,
98+ * Only the internally managed GRPC Channel and Executor services are closed. If
99+ * any of them are supplied,
92100 * it is the responsibility of the supplier to take care of them.
93101 */
94102 public void close () {
103+ this .workerThread .interrupt ();
95104 this .isNormalShutdown = true ;
96105 this .processorThread .interrupt ();
97106 this .shutDownWorkerPool ();
98107 this .closeSideCarChannel ();
99108 }
100109
101110 /**
102- * Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread.
103- * This method call blocks indefinitely, or until the current thread is interrupted.
111+ * Establishes a gRPC connection to the sidecar and starts processing work-items
112+ * on the current thread.
113+ * This method call blocks indefinitely, or until the current thread is
114+ * interrupted.
104115 * <p>
105- * Use can alternatively use the {@link #start} method to run orchestration processing in a background thread.
116+ * Use can alternatively use the {@link #start} method to run orchestration
117+ * processing in a background thread.
106118 * <p>
107- * This method retries continuously to establish a connection to the sidecar. If a connection fails,
108- * a warning log message will be written and a new connection attempt will be made. This process
109- * continues until either a connection succeeds or the process receives an interrupt signal.
119+ * This method retries continuously to establish a connection to the sidecar. If
120+ * a connection fails,
121+ * a warning log message will be written and a new connection attempt will be
122+ * made. This process
123+ * continues until either a connection succeeds or the process receives an
124+ * interrupt signal.
110125 */
111126 public void startAndBlock () {
112127 logger .log (Level .INFO , "Durable Task worker is connecting to sidecar at {0}." , this .getSidecarAddress ());
@@ -121,8 +136,7 @@ public void startAndBlock() {
121136 this .dataConverter ,
122137 logger );
123138
124- // TODO: How do we interrupt manually?
125- while (true && !this .isNormalShutdown ) {
139+ while (true ) {
126140 try {
127141 GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest .newBuilder ().build ();
128142 Iterator <WorkItem > workItemStream = this .sidecarClient .getWorkItems (getWorkItemsRequest );
@@ -149,11 +163,17 @@ public void startAndBlock() {
149163 this .sidecarClient .completeOrchestratorTask (response );
150164 } catch (StatusRuntimeException e ) {
151165 if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
152- logger .log (Level .WARNING , "The sidecar at address {0} is unavailable while completing the orchestrator task." , this .getSidecarAddress ());
166+ logger .log (Level .WARNING ,
167+ "The sidecar at address {0} is unavailable while completing the orchestrator task." ,
168+ this .getSidecarAddress ());
153169 } else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
154- logger .log (Level .WARNING , "Durable Task worker has disconnected from {0} while completing the orchestrator task." , this .getSidecarAddress ());
170+ logger .log (Level .WARNING ,
171+ "Durable Task worker has disconnected from {0} while completing the orchestrator task." ,
172+ this .getSidecarAddress ());
155173 } else {
156- logger .log (Level .WARNING , "Unexpected failure completing the orchestrator task at {0}." , this .getSidecarAddress ());
174+ logger .log (Level .WARNING ,
175+ "Unexpected failure completing the orchestrator task at {0}." ,
176+ this .getSidecarAddress ());
157177 }
158178 }
159179 });
@@ -193,29 +213,35 @@ public void startAndBlock() {
193213 this .sidecarClient .completeActivityTask (responseBuilder .build ());
194214 } catch (StatusRuntimeException e ) {
195215 if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
196- logger .log (Level .WARNING , "The sidecar at address {0} is unavailable while completing the activity task." , this .getSidecarAddress ());
216+ logger .log (Level .WARNING ,
217+ "The sidecar at address {0} is unavailable while completing the activity task." ,
218+ this .getSidecarAddress ());
197219 } else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
198- logger .log (Level .WARNING , "Durable Task worker has disconnected from {0} while completing the activity task." , this .getSidecarAddress ());
220+ logger .log (Level .WARNING ,
221+ "Durable Task worker has disconnected from {0} while completing the activity task." ,
222+ this .getSidecarAddress ());
199223 } else {
200- logger .log (Level .WARNING , "Unexpected failure completing the activity task at {0}." , this .getSidecarAddress ());
224+ logger .log (Level .WARNING , "Unexpected failure completing the activity task at {0}." ,
225+ this .getSidecarAddress ());
201226 }
202227 }
203228 });
204- }
205- else if (requestType == RequestCase .HEALTHPING )
206- {
229+ } else if (requestType == RequestCase .HEALTHPING ) {
207230 // No-op
208231 } else {
209- logger .log (Level .WARNING , "Received and dropped an unknown '{0}' work-item from the sidecar." , requestType );
232+ logger .log (Level .WARNING , "Received and dropped an unknown '{0}' work-item from the sidecar." ,
233+ requestType );
210234 }
211235 }
212236 } catch (StatusRuntimeException e ) {
213237 if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
214- logger .log (Level .INFO , "The sidecar at address {0} is unavailable. Will continue retrying." , this .getSidecarAddress ());
238+ logger .log (Level .INFO , "The sidecar at address {0} is unavailable. Will continue retrying." ,
239+ this .getSidecarAddress ());
215240 } else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
216241 logger .log (Level .INFO , "Durable Task worker has disconnected from {0}." , this .getSidecarAddress ());
217242 } else {
218- logger .log (Level .WARNING , String .format ("Unexpected failure connecting to %s" , this .getSidecarAddress ()), e );
243+ logger .log (Level .WARNING ,
244+ String .format ("Unexpected failure connecting to %s" , this .getSidecarAddress ()), e );
219245 }
220246
221247 // Retry after 5 seconds
@@ -229,7 +255,8 @@ else if (requestType == RequestCase.HEALTHPING)
229255 }
230256
231257 /**
232- * Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed.
258+ * Stops the current worker's listen loop, preventing any new orchestrator or
259+ * activity events from being processed.
233260 */
234261 public void stop () {
235262 this .close ();
@@ -250,7 +277,8 @@ private void closeSideCarChannel() {
250277 private void shutDownWorkerPool () {
251278 if (this .isExecutorServiceManaged ) {
252279 if (!this .isNormalShutdown ) {
253- logger .log (Level .WARNING , "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted" );
280+ logger .log (Level .WARNING ,
281+ "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted" );
254282 }
255283
256284 this .workerPool .shutdown ();
0 commit comments