-
Notifications
You must be signed in to change notification settings - Fork 8
ScheduledThreadPoolExecutor
When you have a task that you need to execute periodically, you need a ScheduledThreadPoolExecutor. Sometimes, these tasks do the work themselves. Sometimes, they simply find work to be done and then create subtasks of work to be done by some other asynchronous service such as an ExecutorService, ExecutorCompletionService, or ForkJoinPool.
A ScheduledThreadPoolExecutor extends ExecutorService and consequently exposes all of its functionality. In addition, it exposes several schedule methods that accept a task for period execution
You might think of ScheduledThreadPoolExecutorin the same manner as a CF Scheduled Task. The primary differences are that they can be scheduled as far down as the Nanosecond level, and they are entirely runtime configurable. This means you can cancel, pause, and create new tasks as your application sees fit.
Tasks and their execution environment is covered in ExecutorService. Here, we'll cover just the usage specific to ScheduledThreadPoolExecutor.
You can see a ScheduledThreadPoolExecutor in action when you download CFConcurrent and run examples/ScheduledThreadPoolExecutor/index.cfm
You'll notice that the ScheduledThreadPoolExecutor accepts a Runnable and not a Callable. The reason is that scheduled tasks are not result-returning tasks... they run on a schedule and "do things"... they do not run once and "return things".
For you, this simply means that your Task must have a run() method that returns void. Moving along...
In most cases, you'll create the ScheduledThreadPoolExecutor as an application-scoped variable in Application.cfc. At time of construction, you'll most likely create the tasks that you wish to execute on a schedule. This example demonstrates scheduling a single task, though the mechanics are identical for N+1 tasks:
component{
this.name = "yourAwesomeApp";
function onApplicationStart(){
application.executorService = createObject("component", "cfconcurrent.ScheduledThreadPoolExecutor")
.init( serviceName = "ScheduledThreadPoolExecutorExample", maxConcurrent = 0 );
application.executorService.setLoggingEnabled( true );
application.executorService.start();
//now schedule a runnable task to run every few seconds
application.task1 = createObject("component", "SimpleRunnableTask").init( "task1" );
application.executorService.scheduleAtFixedRate("task1", application.task1, 0, 2, application.executorService.getObjectFactory().SECONDS);
}
function onRequestStart(){
if( structKeyExists(url, "reinit") and url.reinit eq "hotdawg" ){
applicationStop();
onApplicationStop();
}
}
function onApplicationStop(){
application.executorService.stop();
}
}You'll not that we've created a new Task and scheduled it with scheduleAtFixedRate(). We give it a 0-time start delay, which signals that it should start immediately, and we give it a 2-time frequency, which signifies that it should run every 2 units, where in this case "units" is "Seconds".
Thus, every 2 seconds, until the service is stopped, it'll execute SimpleRunnableTask.run().
You can also use scheduleWithFixedDelay(). The difference is that scheduleAtFixedRate will attempt to catch up if the execution at any point is longer than the configured rate. For example, if an execution lasts 3 seconds, then immediately after completion, it will fire again and attempt to catch up so that it stays on schedule with the 2-second execution.
scheduleWithFixedDelay will not catch up. In this case, the 2-second configuration specifies the delay, and not the rate. thus, when any given execution lasts longer than that 2-second delay, it does not attempt to catch up. It continues to wait 2 seconds before firing again.
If you care about it staying on schedule -- Fire every 2 seconds and catch up if you take too long! -- use scheduleAtFixedRate
If you care more about the delay than the schedule -- Always wait 2 seconds between executions! -- use scheduleWithFixedDelay
When using a ScheduledThreadPoolExecutor directly in ColdFusion, without CFConcurrent, cancelling a Task is tedious. It's also a common enough use case that CFConcurrent accounts for it as a first class citizen. To that end CFConcurrent's API for scheduling tasks requires a unique "task Id" when scheduling a task.
When you wish to cancel a scheduled task, simply pass that task Id to cancel():
application.executorService.cancelTask( "task1" );Dave Ferguson created a LogWatcher application using CF10's WebSockets. When a user selected a log to be watched, the application establishes a WebSocket connection, which continuously reads that log file and pushes new lines to the client.
This uses a ScheduledThreadPoolExecutor. When a client connects, a new FilePublisherTask is scheduled. That task is responsible for reading the file and pushing results to the client. When the user disconnects, the task is cancelled.
You might have an application that periodically cleans a directory of files. Perhaps your application writes thousands of files an hour to a bunch of directories, and you need to aggressively clean them. Using a ScheduledThreadPoolExecutor, you might schedule a separate Task for each directory, and on a schedule each of those tasks might recurse its children and in return schedule FileDeletionTasks to a shared ExecutorService. You might even discover a more performant approach with a ForkJoinPool.
Perhaps you have a workflow engine wherein multiple external producers submit jobs for consumption. You might create a Task that runs every second whose purpose is to find work to be done, and it in turn schedules that work amongst multiple consumer servers.