Skip to content

Commit 09a3047

Browse files
authored
Merge pull request #1754 from Caltech-IPAC/FIREFLY-1727-handle-redis-failure
FIREFLY-1727: Handle Redis failure
2 parents 770c607 + e0ad8f1 commit 09a3047

File tree

16 files changed

+161
-190
lines changed

16 files changed

+161
-190
lines changed

src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java

+49-37
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
package edu.caltech.ipac.firefly.core;
66

7+
import edu.caltech.ipac.firefly.data.ServerEvent;
8+
import edu.caltech.ipac.firefly.server.events.FluxAction;
9+
import edu.caltech.ipac.firefly.server.events.ServerEventManager;
710
import edu.caltech.ipac.firefly.server.util.Logger;
811
import edu.caltech.ipac.util.AppProperties;
912
import redis.clients.jedis.Jedis;
@@ -14,17 +17,17 @@
1417

1518
import java.io.File;
1619
import java.io.IOException;
17-
import java.net.ConnectException;
1820
import java.security.MessageDigest;
1921
import java.security.NoSuchAlgorithmException;
2022
import java.time.Duration;
23+
import java.time.Instant;
24+
import java.time.ZoneId;
25+
import java.time.format.DateTimeFormatter;
2126
import java.time.temporal.ChronoUnit;
2227
import java.util.Arrays;
2328
import java.util.LinkedHashMap;
2429
import java.util.Map;
2530

26-
import static edu.caltech.ipac.firefly.core.RedisService.Status.ONLINE;
27-
import static edu.caltech.ipac.firefly.core.RedisService.Status.OFFLINE;
2831
import static edu.caltech.ipac.firefly.core.Util.Try;
2932

3033

@@ -45,7 +48,6 @@
4548
* @version $Id: $
4649
*/
4750
public class RedisService {
48-
public enum Status {ONLINE, FAIL_TO_CONNECT, OFFLINE};
4951
public static final String REDIS_HOST = "redis.host";
5052
public static final String MAX_POOL_SIZE = "redis.max.poolsize";
5153
private static final int REDIS_PORT = AppProperties.getIntProperty("redis.port", 6379);
@@ -58,18 +60,14 @@ public enum Status {ONLINE, FAIL_TO_CONNECT, OFFLINE};
5860
private static final String redisHost = AppProperties.getProperty(REDIS_HOST, "localhost");
5961
public static final int maxPoolSize = AppProperties.getIntProperty(MAX_POOL_SIZE, 100);
6062
private static JedisPool jedisPool;
61-
private static Status status = Status.OFFLINE;
63+
private static Instant failSince;
6264

6365
private static String getRedisPassword() {
6466
String passwd = System.getenv("REDIS_PASSWORD");
6567
if (passwd == null) passwd = AppProperties.getProperty("REDIS_PASSWORD");
6668
return passwd;
6769
}
6870

69-
static {
70-
connect();
71-
}
72-
7371
static JedisPool createJedisPool() {
7472
try {
7573
JedisPoolConfig pconfig = new JedisPoolConfig();
@@ -103,42 +101,61 @@ static void startLocal() {
103101
} catch (IOException ignored) {}
104102
}
105103

106-
public static boolean connect() throws RuntimeException {
107-
if (jedisPool != null && !jedisPool.isClosed()) {
108-
return true; // already connected
109-
}
110-
jedisPool = createJedisPool();
111-
if (jedisPool == null && redisHost.equals("localhost")) {
112-
// can't connect; will start up embedded version if localhost
113-
startLocal();
114-
jedisPool = createJedisPool();
115-
}
116-
if (jedisPool == null) {
117-
LOG.error("Unable to connect to Redis at " + redisHost + ":" + REDIS_PORT);
118-
status = Status.FAIL_TO_CONNECT;
119-
return false;
120-
} else {
121-
status = ONLINE;
122-
return true;
104+
public static Instant getFailSince() { return failSince; }
105+
106+
public static Jedis getConnection() throws Exception {
107+
try {
108+
if (jedisPool == null || jedisPool.isClosed()) {
109+
jedisPool = createJedisPool();
110+
if (jedisPool == null && redisHost.equals("localhost")) {
111+
// can't connect; will start up embedded version if localhost
112+
startLocal();
113+
jedisPool = createJedisPool();
114+
}
115+
if (jedisPool == null || jedisPool.isClosed()) {
116+
if (jedisPool != null) {
117+
Try.it(jedisPool::close);
118+
jedisPool = null;
119+
}
120+
throw new RuntimeException("Unable to connect to Redis at " + redisHost + ":" + REDIS_PORT);
121+
}
122+
}
123+
Jedis jedis = jedisPool.getResource();
124+
if (failSince != null) updateConnectionStatus(false);
125+
return jedis;
126+
} catch (Exception e) {
127+
if (failSince == null) {
128+
updateConnectionStatus(true);
129+
}
130+
throw e;
123131
}
124132
}
125133

134+
// because Redis may be down, we need to use processEvent to directly send it to clients currently connected
135+
// to this server and not rely on distributed event messaging.
136+
public static void updateConnectionStatus(boolean lost) {
137+
failSince = lost ? Instant.now() : null;
138+
String reason = lost ? "A critical system component is currently unavailable" : "";
139+
140+
FluxAction action = new FluxAction("app_data.appUpdate");
141+
action.setValue(lost, "connectionStatus", "lost");
142+
action.setValue(reason, "connectionStatus", "reason");
143+
ServerEventManager.processEvent(ServerEventManager.convertTo(action, ServerEvent.Scope.WORLD));
144+
}
145+
126146
public static void teardown() {
127147
disconnect();
128148
File[] files = new File(DB_DIR).listFiles();
129149
if (files!= null) Arrays.stream(files).forEach(File::delete);
130150
}
131151

132152
public static void disconnect() {
133-
status = OFFLINE;
134153
if (jedisPool != null) {
135154
jedisPool.close();
136155
jedisPool = null;
137156
}
138157
}
139158

140-
public static Status getStatus() { return status; }
141-
142159
public static Map<String, Object> getStats() {
143160

144161
LinkedHashMap<String, Object> stats = new LinkedHashMap<>();
@@ -187,13 +204,8 @@ public static void addStat(Map<String, Object> stats, Jedis redis, String key) {
187204
}
188205

189206
public static String getRedisHostPortDesc() {
190-
return redisHost + ":" + REDIS_PORT + " ("+ getStatus() + ")";
191-
}
192-
193-
public static Jedis getConnection() throws Exception {
194-
if (connect()) {
195-
return jedisPool.getResource();
196-
}
197-
throw new ConnectException("Unable to connect to Redis at " + redisHost + ":" + REDIS_PORT);
207+
String status = failSince == null ? "OK" :
208+
"Failed since " + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()).format(failSince);
209+
return redisHost + ":" + REDIS_PORT + " (%s)".formatted(status);
198210
}
199211
}

src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java

+47-32
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
import redis.clients.jedis.Jedis;
1010
import redis.clients.jedis.JedisPubSub;
1111

12+
import java.time.Duration;
13+
import java.time.Instant;
14+
import java.util.Collections;
15+
import java.util.Map;
1216
import java.util.concurrent.ConcurrentHashMap;
1317
import java.util.concurrent.CopyOnWriteArrayList;
1418
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,7 +35,7 @@ public class Messenger {
3135
private static Logger.LoggerImpl LOG = Logger.getLogger();
3236

3337
// to limit one thread per topic
34-
private static ConcurrentHashMap<String, SubscriberHandle> pubSubHandlers = new ConcurrentHashMap<>();
38+
private static ConcurrentHashMap<String, SubscriberHandler> pubSubHandlers = new ConcurrentHashMap<>();
3539

3640
/**
3741
* @param topic the topic to subscribe to
@@ -41,11 +45,11 @@ public class Messenger {
4145
public static Subscriber subscribe(String topic, Subscriber subscriber) {
4246
if (pubSubHandlers.containsKey(topic)) {
4347
LOG.trace("Add subscriber to existing topic: " + topic);
44-
SubscriberHandle pubSub = pubSubHandlers.get(topic);
48+
SubscriberHandler pubSub = pubSubHandlers.get(topic);
4549
pubSub.addSubscriber(subscriber);
4650
} else {
4751
LOG.trace("Add subscriber to new topic: " + topic);
48-
SubscriberHandle pubSub = new SubscriberHandle(topic);
52+
SubscriberHandler pubSub = new SubscriberHandler(topic);
4953
pubSubHandlers.put(topic, pubSub);
5054
pubSub.addSubscriber(subscriber);
5155
}
@@ -55,6 +59,9 @@ public static Subscriber subscribe(String topic, Subscriber subscriber) {
5559
public static int getSubscribedTopics() {
5660
return pubSubHandlers.size();
5761
}
62+
public static Map<String, SubscriberHandler> getSubscribers() {
63+
return Collections.unmodifiableMap(pubSubHandlers);
64+
}
5865

5966
/**
6067
* @param subscriber the subscriber to remove
@@ -90,19 +97,21 @@ public static void publish(Message msg) {
9097
* Internal handler class used to manage the one-to-many relationship of Messenger's subscriber and
9198
* Jedis's subscriber
9299
*/
93-
static class SubscriberHandle {
100+
public static class SubscriberHandler {
94101

95102
private final CopyOnWriteArrayList<Subscriber> subscribers = new CopyOnWriteArrayList<>();
96103
private final String topic;
97104
private final AtomicInteger retries = new AtomicInteger(5);
105+
private Instant failSince = Instant.now();
106+
private Thread subscriberThread;
98107
JedisPubSub jPubSub = new JedisPubSub() {
99108
public void onMessage(String channel, String message) {
100109
Message msg = Message.parse(message);
101110
subscribers.forEach((sub) -> {
102111
try {
103112
sub.onMessage(msg);
104113
} catch (Exception e) {
105-
LOG.warn("Error while processing message: " + e.getMessage());
114+
LOG.error(e, "Error while processing message: " + message);
106115
}
107116
});
108117
}
@@ -115,13 +124,26 @@ public void onUnsubscribe(String channel, int subscribedChannels) {
115124

116125
};
117126

118-
SubscriberHandle(String topic) {
127+
SubscriberHandler(String topic) {
119128
this.topic = topic;
120129
}
121130

122131
void addSubscriber(Subscriber sub) {
123132
subscribers.add(sub);
124-
subscribe();
133+
if (subscriberThread == null || !subscriberThread.isAlive()) {
134+
// Start the subscriber in a separate thread; use only one thread per topic
135+
subscriberThread = new Thread(() -> {
136+
try {
137+
subscribeToRedis();
138+
} catch (Exception e) {
139+
LOG.error(e, "Error while subscribing to topic: " + topic);
140+
} finally {
141+
subscriberThread = null;
142+
LOG.trace("exiting subscribing to topic: " + topic);
143+
}
144+
});
145+
subscriberThread.start();
146+
}
125147
}
126148

127149
void removeSubscriber(Subscriber sub) {
@@ -132,33 +154,26 @@ void removeSubscriber(Subscriber sub) {
132154
}
133155
}
134156

135-
void subscribe() {
136-
if (subscribers.size() > 1 || jPubSub.isSubscribed()) return; // already subscribed; do nothing.
137-
138-
Thread subscriberThread = new Thread(() -> {
139-
LOG.trace("start subscribing to topic: " + topic);
140-
try (Jedis jedis = RedisService.getConnection()) {
141-
jedis.subscribe(jPubSub, topic); // Blocks here
142-
} catch (Exception e) {
143-
LOG.error(e, "Error while subscribing to topic: " + topic);
144-
if (!subscribers.isEmpty()) {
145-
if (retries.decrementAndGet() != 0) {
146-
try {
147-
LOG.info("Retry subscribing to %s after 1s wait".formatted(topic));
148-
Thread.sleep(1_000);
149-
subscribe();
150-
} catch (InterruptedException ignored) {}
151-
} else {
152-
LOG.info("Gave up subscribing after a connection failure to the topic: " + topic);
153-
pubSubHandlers.remove(topic); // give up and clear topic cache
154-
}
157+
public Instant getFailSince() {
158+
return failSince;
159+
}
160+
161+
void subscribeToRedis() throws InterruptedException {
162+
LOG.trace("start subscribing to topic: " + topic);
163+
try (Jedis jedis = RedisService.getConnection()) {
164+
failSince = null;
165+
jedis.subscribe(jPubSub, topic); // Blocks here
166+
} catch (Exception e) {
167+
if (failSince == null) failSince = Instant.now();
168+
if (!subscribers.isEmpty()) {
169+
long secSinceFailed = Duration.between(getFailSince(), Instant.now()).toSeconds();
170+
if (secSinceFailed % 5 == 0) {
171+
LOG.info("Subscriber (%s) has been disconnected from Redis for %d seconds".formatted(topic, secSinceFailed));
155172
}
173+
Thread.sleep(1_000);
174+
subscribeToRedis();
156175
}
157-
LOG.trace("exiting subscribing to topic: " + topic);
158-
});
159-
// Start the subscriber in a separate thread
160-
subscriberThread.start();
176+
}
161177
}
162178
}
163-
164179
}

src/firefly/java/edu/caltech/ipac/firefly/server/AppServerCommands.java

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package edu.caltech.ipac.firefly.server;
55

6+
import edu.caltech.ipac.firefly.core.RedisService;
67
import edu.caltech.ipac.firefly.core.background.JobManager;
78
import edu.caltech.ipac.firefly.data.Alert;
89
import edu.caltech.ipac.firefly.data.ServerEvent;
@@ -20,6 +21,7 @@
2021
import org.json.simple.JSONObject;
2122
import org.json.simple.parser.JSONParser;
2223
import org.json.simple.parser.ParseException;
24+
import redis.clients.jedis.Jedis;
2325

2426
import java.util.ArrayList;
2527
import java.util.HashMap;
@@ -55,6 +57,9 @@ public String doCommand(SrvParam params) throws Exception {
5557
action.setValue(bgInfo.notifEnabled(), "notifEnabled");
5658
ServerEventManager.fireAction(action, ServerEvent.Scope.SELF);
5759

60+
// check for redis connection
61+
if (RedisService.getFailSince() != null) RedisService.updateConnectionStatus(true);
62+
5863
return "true";
5964
}
6065
}

src/firefly/java/edu/caltech/ipac/firefly/server/ServerContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public static void configInit() {
146146
configDirname = configDirname == null ? null : configDirname + "/" + contextName;
147147

148148
if (StringUtils.isEmpty(contextName)) {
149-
String errmsg = " is not setup correctly. System will not function properly";
149+
String errmsg = "Failed to init. System will not function properly";
150150
throw new RuntimeException(errmsg);
151151
};
152152

src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventManager.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ public static void fireJsonAction(String actionStr, ServerEvent.EventTarget targ
6868
ServerEventManager.fireEvent(sev);
6969
}
7070

71-
71+
public static ServerEvent convertTo(FluxAction action, ServerEvent.Scope scope) {
72+
return new ServerEvent(Name.ACTION, makeTarget(scope), action.toString());
73+
}
7274

7375
public static void fireEvent(ServerEvent sev) {
7476
if (sev == null || sev.getTarget() == null || !sev.getTarget().hasDestination()) {
@@ -120,7 +122,8 @@ static List<ServerEventQueue> getAllEventQueue() {
120122
return allEventQueues.getCombinedNodeList();
121123
}
122124

123-
static void processEvent(ServerEvent ev) {
125+
// bypass distributed event messaging. should not be call directly unless you know exactly why it's needed.
126+
public static void processEvent(ServerEvent ev) {
124127
totalEventCnt++;
125128
boolean delivered = false;
126129
for(ServerEventQueue queue : localEventQueues) {

src/firefly/java/edu/caltech/ipac/firefly/server/servlets/ServerStatus.java

+13
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import edu.caltech.ipac.firefly.core.RedisService;
77
import edu.caltech.ipac.firefly.core.background.JobManager;
8+
import edu.caltech.ipac.firefly.messaging.Messenger;
89
import edu.caltech.ipac.firefly.server.Counters;
910
import edu.caltech.ipac.firefly.server.ServerContext;
1011
import edu.caltech.ipac.firefly.server.cache.EhcacheProvider;
@@ -28,6 +29,8 @@
2829
import java.io.PrintWriter;
2930
import java.rmi.RemoteException;
3031
import java.text.SimpleDateFormat;
32+
import java.time.ZoneId;
33+
import java.time.format.DateTimeFormatter;
3134
import java.util.Arrays;
3235
import java.util.Base64;
3336
import java.util.Collections;
@@ -314,6 +317,16 @@ private static void showMessagingStatus(PrintWriter w) {
314317
w.println("----------------- ");
315318
Map<String, Object> stats = RedisService.getStats();
316319
stats.forEach((k,v)-> w.println(" - " + k + ": " + v));
320+
321+
w.println("\nMessenger info: ");
322+
w.println("----------------- ");
323+
w.println(" Subscribed Topics: " + Messenger.getSubscribedTopics());
324+
for (Map.Entry<String, Messenger.SubscriberHandler> entry : Messenger.getSubscribers().entrySet()) {
325+
String topic = entry.getKey();
326+
Messenger.SubscriberHandler handler = entry.getValue();
327+
w.println(" - Topic: " + "%-20s".formatted(topic) + " Status: " + (String.format(handler.getFailSince() == null ? "OK" :
328+
"Failed since " + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()).format(handler.getFailSince()))));
329+
}
317330
}
318331

319332
private static void showPackagingStatus(PrintWriter w, boolean details) {

0 commit comments

Comments
 (0)