Skip to content

Commit b0a83eb

Browse files
authored
Merge pull request #142 from pusher/bug/concurrentMoficiationException_when_iterating_channels
Synchronise access to channelNameToChannelMap
2 parents a51b9dc + 6485a0c commit b0a83eb

File tree

2 files changed

+29
-10
lines changed

2 files changed

+29
-10
lines changed

src/main/java/com/pusher/client/channel/impl/ChannelManager.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.pusher.client.channel.impl;
22

3-
import java.util.HashMap;
43
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
55

66
import com.google.gson.Gson;
77
import com.pusher.client.AuthorizationFailureException;
@@ -20,7 +20,8 @@
2020
public class ChannelManager implements ConnectionEventListener {
2121

2222
private static final Gson GSON = new Gson();
23-
private final Map<String, InternalChannel> channelNameToChannelMap = new HashMap<String, InternalChannel>();
23+
private final Map<String, InternalChannel> channelNameToChannelMap = new ConcurrentHashMap<String, InternalChannel>();
24+
2425
private final Factory factory;
2526
private InternalConnection connection;
2627

@@ -70,8 +71,7 @@ public void setConnection(final InternalConnection connection) {
7071
connection.bind(ConnectionState.CONNECTED, this);
7172
}
7273

73-
public void subscribeTo(final InternalChannel channel, final ChannelEventListener listener,
74-
final String... eventNames) {
74+
public void subscribeTo(final InternalChannel channel, final ChannelEventListener listener, final String... eventNames) {
7575

7676
validateArgumentsAndBindEvents(channel, listener, eventNames);
7777
channelNameToChannelMap.put(channel.getName(), channel);
@@ -88,7 +88,6 @@ public void unsubscribeFrom(final String channelName) {
8888
if (channel == null) {
8989
return;
9090
}
91-
9291
if (connection.getState() == ConnectionState.CONNECTED) {
9392
sendUnsubscribeMessage(channel);
9493
}
@@ -116,8 +115,7 @@ public void onMessage(final String event, final String wholeMessage) {
116115
public void onConnectionStateChange(final ConnectionStateChange change) {
117116

118117
if (change.getCurrentState() == ConnectionState.CONNECTED) {
119-
120-
for (final InternalChannel channel : channelNameToChannelMap.values()) {
118+
for(final InternalChannel channel : channelNameToChannelMap.values()){
121119
sendOrQueueSubscribeMessage(channel);
122120
}
123121
}
@@ -181,8 +179,7 @@ public void run() {
181179
}
182180
}
183181

184-
private void validateArgumentsAndBindEvents(final InternalChannel channel, final ChannelEventListener listener,
185-
final String... eventNames) {
182+
private void validateArgumentsAndBindEvents(final InternalChannel channel, final ChannelEventListener listener, final String... eventNames) {
186183

187184
if (channel == null) {
188185
throw new IllegalArgumentException("Cannot subscribe to a null channel");

src/test/java/com/pusher/client/channel/impl/ChannelManagerTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.pusher.client.connection.ConnectionStateChange;
2525
import com.pusher.client.connection.impl.InternalConnection;
2626
import com.pusher.client.util.Factory;
27+
import java.util.concurrent.Executors;
2728

2829
@RunWith(MockitoJUnitRunner.class)
2930
public class ChannelManagerTest {
@@ -325,7 +326,7 @@ public void testGetChannelFromString(){
325326

326327
@Test
327328
public void testGetNonExistentChannelFromString(){
328-
Channel channel = channelManager.getChannel("woot");
329+
Channel channel = channelManager.getChannel("woot");
329330
assertNull(channel);
330331
}
331332

@@ -369,4 +370,25 @@ public void testGetNonExistentPresenceChannel(){
369370
PresenceChannel channel = channelManager.getPresenceChannel("presence-yolo");
370371
assertNull(channel);
371372
}
373+
374+
@Test
375+
public void testConcurrentModificationExceptionDoesNotHappenWhenConnectionIsEstablished() {
376+
for(int i = 0; i<1000; i++) {
377+
channelManager.subscribeTo(new ChannelImpl("channel" + i, factory), null);
378+
}
379+
380+
Runnable removeChannels = new Runnable() {
381+
@Override
382+
public void run() {
383+
System.out.println("Start unsubscribe");
384+
for(int i=900; i<1000; i++){
385+
channelManager.unsubscribeFrom("channel"+i);
386+
}
387+
System.out.println("end unsubscribe");
388+
}
389+
};
390+
Executors.newSingleThreadExecutor().submit(removeChannels);
391+
392+
channelManager.onConnectionStateChange(new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTED));
393+
}
372394
}

0 commit comments

Comments
 (0)