23
23
24
24
import java .io .IOException ;
25
25
import java .net .InetSocketAddress ;
26
+ import java .net .StandardSocketOptions ;
26
27
import java .nio .ByteBuffer ;
27
28
import java .nio .channels .SocketChannel ;
28
29
import java .util .concurrent .ArrayBlockingQueue ;
31
32
import java .util .concurrent .atomic .AtomicLong ;
32
33
import java .util .concurrent .locks .ReentrantLock ;
33
34
34
- public class NaRPCEndpoint <R extends NaRPCMessage , T extends NaRPCMessage > extends NaRPCChannel {
35
+ public class NaRPCEndpoint <R extends NaRPCMessage , T extends NaRPCMessage > {
35
36
private NaRPCGroup group ;
36
37
private ConcurrentHashMap <Long , NaRPCFuture <R ,T >> pendingRPCs ;
37
38
private ArrayBlockingQueue <ByteBuffer > bufferQueue ;
@@ -43,81 +44,105 @@ public class NaRPCEndpoint<R extends NaRPCMessage, T extends NaRPCMessage> exten
43
44
public NaRPCEndpoint (NaRPCGroup group , SocketChannel channel ) throws Exception {
44
45
this .group = group ;
45
46
this .channel = channel ;
47
+ this .channel .setOption (StandardSocketOptions .TCP_NODELAY , group .isNodelay ());
48
+ this .channel .setOption (StandardSocketOptions .SO_REUSEADDR , true );
46
49
this .pendingRPCs = new ConcurrentHashMap <Long , NaRPCFuture <R ,T >>();
47
- this .readLock = new ReentrantLock ();
48
- this .writeLock = new ReentrantLock ();
49
50
this .bufferQueue = new ArrayBlockingQueue <ByteBuffer >(group .getQueueDepth ());
50
51
for (int i = 0 ; i < group .getQueueDepth (); i ++){
51
52
ByteBuffer buffer = ByteBuffer .allocate (group .getMessageSize ());
52
53
bufferQueue .put (buffer );
53
54
}
54
55
this .sequencer = new AtomicLong (1 );
56
+ this .readLock = new ReentrantLock ();
57
+ this .writeLock = new ReentrantLock ();
58
+ }
59
+
60
+ public void connect (InetSocketAddress address ) throws IOException {
61
+ this .channel .connect (address );
62
+ this .channel .configureBlocking (false );
63
+ }
64
+
65
+ public void close () throws IOException {
66
+ this .channel .close ();
55
67
}
56
68
57
69
public NaRPCFuture <R ,T > issueRequest (R request , T response ) throws IOException {
58
70
ByteBuffer buffer = getBuffer ();
71
+ while (buffer == null ){
72
+ buffer = getBuffer ();
73
+ }
59
74
long ticket = sequencer .getAndIncrement ();
60
- makeMessage (ticket , request , buffer );
75
+ NaRPCProtocol . makeMessage (ticket , request , buffer );
61
76
NaRPCFuture <R ,T > future = new NaRPCFuture <R ,T >(this , request , response , ticket );
62
77
pendingRPCs .put (ticket , future );
63
- while (!tryTransmitting (buffer )){
78
+
79
+ boolean wlocked = writeLock .tryLock ();
80
+ if (wlocked ){
81
+ channel .write (buffer );
82
+ while (buffer .hasRemaining ()){
83
+ pollResponse ();
84
+ channel .write (buffer );
85
+ }
86
+ writeLock .unlock ();
64
87
}
88
+
65
89
putBuffer (buffer );
66
90
return future ;
67
91
}
68
-
69
- public void pollResponse (AtomicBoolean done ) throws IOException {
92
+
93
+ void pollResponse () throws IOException {
70
94
ByteBuffer buffer = getBuffer ();
71
- boolean locked = readLock .tryLock ();
72
- if (locked ) {
73
- if (!done .get ()){
74
- long ticket = fetchBuffer (channel , buffer );
75
- if (ticket < 0 ){
76
- throw new IOException ("Got invalid ticket, connection closed? " + ticket );
77
- }
78
- NaRPCFuture <R ,T > future = pendingRPCs .remove (ticket );
79
- future .getResponse ().update (buffer );
80
- future .signal ();
81
- }
95
+ if (buffer == null ){
96
+ return ;
97
+ }
98
+ boolean rlocked = readLock .tryLock ();
99
+ long ticket = 0 ;
100
+ if (rlocked ){
101
+ ticket = NaRPCProtocol .fetchBuffer (channel , buffer );
82
102
readLock .unlock ();
83
- }
103
+ }
104
+ if (ticket > 0 ){
105
+ NaRPCFuture <R , T > future = pendingRPCs .remove (ticket );
106
+ future .getResponse ().update (buffer );
107
+ future .signal ();
108
+ }
84
109
putBuffer (buffer );
85
110
}
86
-
87
- public void connect (InetSocketAddress address ) throws IOException {
88
- this .channel .connect (address );
89
- this .channel .socket ().setTcpNoDelay (group .isNodelay ());
90
- this .channel .socket ().setReuseAddress (true );
91
- this .channel .configureBlocking (false );
92
- }
93
-
94
- public void close () throws IOException {
95
- this .channel .close ();
96
- }
97
111
98
112
public String address () throws IOException {
99
113
return channel .getRemoteAddress ().toString ();
100
114
}
101
115
102
- private boolean tryTransmitting (ByteBuffer buffer ) throws IOException {
103
- boolean locked = writeLock .tryLock ();
104
- if (locked ) {
105
- transmitMessage (channel , buffer );
106
- writeLock .unlock ();
107
- return true ;
108
- }
109
- return false ;
110
- }
111
-
112
116
private ByteBuffer getBuffer (){
113
117
ByteBuffer buffer = bufferQueue .poll ();
114
- while (buffer == null ){
115
- buffer = bufferQueue .poll ();
116
- }
117
118
return buffer ;
118
119
}
119
120
120
121
private void putBuffer (ByteBuffer buffer ){
121
122
bufferQueue .add (buffer );
122
123
}
124
+
125
+ //----------- Old stuff
126
+
127
+ // public NaRPCFuture<R,T> issueRequestOld(R request, T response) throws IOException {
128
+ // ByteBuffer buffer = getBuffer();
129
+ // long ticket = sequencer.getAndIncrement();
130
+ // makeMessage(ticket, request, buffer);
131
+ // NaRPCFuture<R,T> future = new NaRPCFuture<R,T>(this, request, response, ticket);
132
+ // pendingRPCs.put(ticket, future);
133
+ // while(!tryTransmittingOld(buffer)){
134
+ // }
135
+ // putBuffer(buffer);
136
+ // return future;
137
+ // }
138
+
139
+ // private boolean tryTransmittingOld(ByteBuffer buffer) throws IOException{
140
+ // boolean locked = writeLock.tryLock();
141
+ // if (locked) {
142
+ // transmitMessageOld(channel, buffer);
143
+ // writeLock.unlock();
144
+ // return true;
145
+ // }
146
+ // return false;
147
+ // }
123
148
}
0 commit comments