Skip to content

Commit 2ebbabc

Browse files
committed
JDK 7: added example of TransferQueue
1 parent caea3c9 commit 2ebbabc

File tree

2 files changed

+176
-0
lines changed

2 files changed

+176
-0
lines changed

java-7/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Java 7
2+
3+
## Features
4+
5+
* Collections API:
6+
* `TransferQueue<E>`
7+
* extends `BlockingQueue<E>` interface
8+
* implementated by `LinkedTransferQueue<E>`
9+
* blocking queue in which producer may wait for consumers to receive elements
10+
* it can publish an element and wait for consumers or just publish and forget
11+
* it can use a timeout to wait for a consumer
12+
* methods:
13+
* `getWaitingConsumerCount()`: returns an estimate of the number of waiting consumer to receive elements via `BlockingQueue` `take` or timed `poll`.
14+
* `hasWaitingConsumer()`: returns true if there is any waiting consumer.
15+
* `transfer(E)`: send an element and wait for the consumer.
16+
* `tryTransfer(E)`: send an element to a waiting consumer, return false immediately if there isn't any consumer.
17+
* `tryTransfer(E, long, TimeUnit)`: like `tryTransfer` but with a timeout to wait for a consumer before returning false.
18+

java-7/TransferQueueExample.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import java.util.Scanner;
2+
import java.util.concurrent.BlockingQueue;
3+
import java.util.concurrent.LinkedTransferQueue;
4+
import java.util.concurrent.TransferQueue;
5+
import java.util.concurrent.TimeUnit;
6+
7+
public class TransferQueueExample {
8+
public static void main(String[] args) throws Exception {
9+
TransferQueue<String> queue = new LinkedTransferQueue<>();
10+
11+
var producer = new Producer(queue);
12+
var consumer = new Consumer(queue);
13+
14+
new Thread(consumer).start();
15+
new Thread(producer).start();
16+
17+
Thread.currentThread().join();
18+
}
19+
}
20+
21+
class Producer implements Runnable {
22+
private TransferQueue<String> queue;
23+
private Scanner input;
24+
25+
Producer(TransferQueue<String> queue) {
26+
this.queue = queue;
27+
this.input = new Scanner(System.in);
28+
}
29+
30+
public void run() {
31+
log("started");
32+
while (true) {
33+
log("=== Enter the number: 1 - wait; 2 - wait with timeout; 3 - transfer timedout");
34+
35+
var command = input.nextLine();
36+
switch (command) {
37+
case "1":
38+
send("wait");
39+
break;
40+
case "2":
41+
send("timeout");
42+
break;
43+
case "3":
44+
send("transfer_timeout");
45+
waitAndSendWithTimeout("command will not be received");
46+
break;
47+
default:
48+
log("Invalid command");
49+
}
50+
}
51+
}
52+
53+
private void send(String command) {
54+
try {
55+
log("Sending command");
56+
queue.transfer(command);
57+
log("Command sent and received");
58+
} catch (Exception ex) {
59+
log(ex.getMessage());
60+
}
61+
}
62+
63+
private void sendWithTimeout(String command) {
64+
try {
65+
boolean received = queue.tryTransfer(command, 3, TimeUnit.SECONDS);
66+
log("Command sent, received: " + received);
67+
} catch (Exception ex) {
68+
log(ex.getMessage());
69+
}
70+
}
71+
72+
private void waitAndSendWithTimeout(String command) {
73+
try {
74+
log("Sleeping...");
75+
Thread.sleep(1000);
76+
log("Sending command waiting for 3s");
77+
boolean received = queue.tryTransfer(command, 3, TimeUnit.SECONDS);
78+
log("Command sent, received: " + received);
79+
} catch (Exception ex) {
80+
log(ex.getMessage());
81+
}
82+
}
83+
84+
private void log(String message) {
85+
System.out.printf("[Producer] %s%n", message);
86+
}
87+
}
88+
89+
class Consumer implements Runnable {
90+
private BlockingQueue<String> queue;
91+
92+
Consumer(BlockingQueue<String> queue) {
93+
this.queue = queue;
94+
}
95+
96+
public void run() {
97+
log("started");
98+
String command = waitIndefinitely();
99+
100+
while (true) {
101+
switch (command) {
102+
case "wait":
103+
command = waitIndefinitely();
104+
break;
105+
case "timeout":
106+
command = waitWithTimeout(5);
107+
break;
108+
case "transfer_timeout":
109+
sleep();
110+
command = waitWithTimeout(5);
111+
break;
112+
default:
113+
log("Invalid command");
114+
command = waitIndefinitely();
115+
}
116+
}
117+
}
118+
119+
private String waitIndefinitely() {
120+
log("Waiting...");
121+
try {
122+
String command = queue.take();
123+
log("Command received: " + command);
124+
return command;
125+
} catch (Exception ex) {
126+
log(ex.getMessage());
127+
return null;
128+
}
129+
}
130+
131+
private String waitWithTimeout(long seconds) {
132+
log("Waiting " + seconds + "s");
133+
try {
134+
String command = queue.poll(seconds, TimeUnit.SECONDS);
135+
136+
if (command == null) {
137+
log("Timedout");
138+
return "";
139+
}
140+
141+
log("Command received: " + command);
142+
return command;
143+
} catch (Exception ex) {
144+
log(ex.getMessage());
145+
return null;
146+
}
147+
}
148+
149+
private void sleep() {
150+
try {
151+
Thread.sleep(5000);
152+
} catch (InterruptedException ex) {}
153+
}
154+
155+
private void log(String message) {
156+
System.out.printf("[Consumer] %s%n", message);
157+
}
158+
}

0 commit comments

Comments
 (0)