Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keep opensergoclient alive #7

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/main/java/io/opensergo/ConfigKind.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public enum ConfigKind {
FAULT_TOLERANCE_RULE("fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule", "FaultToleranceRule"),
RATE_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy", "RateLimitStrategy"),
THROTTLING_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ThrottlingStrategy", "ThrottlingStrategy"),
CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy",
"ConcurrencyLimitStrategy"),
CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy", "ConcurrencyLimitStrategy"),
CIRCUIT_BREAKER_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/CircuitBreakerStrategy", "CircuitBreakerStrategy");

private final String kindName;
Expand Down
70 changes: 63 additions & 7 deletions src/main/java/io/opensergo/OpenSergoClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.opensergo.util.AssertUtils;
import io.opensergo.util.IdentifierUtils;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -46,6 +48,7 @@ public class OpenSergoClient implements AutoCloseable {
private final SubscribeRegistry subscribeRegistry;

private AtomicInteger reqId;
protected volatile OpenSergoClientStatus status;

public OpenSergoClient(String host, int port) {
this.channel = ManagedChannelBuilder.forAddress(host, port)
Expand All @@ -56,17 +59,67 @@ public OpenSergoClient(String host, int port) {
this.configCache = new SubscribedConfigCache();
this.subscribeRegistry = new SubscribeRegistry();
this.reqId = new AtomicInteger(0);
status = OpenSergoClientStatus.INITIAL;
}

public void start() throws Exception {
OpenSergoLogger.info("OpensergoClient is starting...");

if (status == OpenSergoClientStatus.INITIAL) {
OpenSergoLogger.info("open keepavlive thread");
Thread keepAliveThread = new Thread(this::keepAlive);
keepAliveThread.setName("thread-opensergo-keepalive-" + keepAliveThread.getId());
keepAliveThread.setDaemon(true);
keepAliveThread.start();
}

status = OpenSergoClientStatus.STARTING;

this.requestAndResponseWriter = transportGrpcStub.withWaitForReady()
.subscribeConfig(new OpenSergoSubscribeClientObserver(configCache, subscribeRegistry));
.subscribeConfig(new OpenSergoSubscribeClientObserver(this));

OpenSergoLogger.info("begin to subscribe config-data...");
this.subscribeRegistry.getSubscriberKeysAll().forEach(subscribeKey -> {
this.subscribeConfig(subscribeKey);
});

OpenSergoLogger.info("openSergoClient is started");
status = OpenSergoClientStatus.STARTED;
}

private void keepAlive() {
// TODO change to event-based design, instead of for-loop.
for (;;) {
if (status == OpenSergoClientStatus.SHUTDOWN) {
return;
}

try {
if (status == OpenSergoClientStatus.INTERRUPTED) {
OpenSergoLogger.info("try to restart openSergoClient...");
this.start();
}
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException e) {
OpenSergoLogger.error(e.toString(), e);
} catch (Exception e) {
try {
this.close();
} catch (Exception ex) {
status = OpenSergoClientStatus.SHUTDOWN;
}
OpenSergoLogger.error("close OpenSergoClient because of " + e, e);
}
}
}

@Override
public void close() throws Exception {
requestAndResponseWriter.onCompleted();

// stop the keepAliveThread
status = OpenSergoClientStatus.SHUTDOWN;

// gracefully drain the requests, then close the connection
channel.shutdown();
}
Expand All @@ -77,8 +130,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null");

if (requestAndResponseWriter == null) {
// TODO: return status that indicates not ready
throw new IllegalStateException("gRPC stream is not ready");
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready"));
status = OpenSergoClientStatus.INTERRUPTED;
}
SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder()
.setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp())
Expand Down Expand Up @@ -106,8 +159,8 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null");

if (requestAndResponseWriter == null) {
// TODO: return status that indicates not ready
throw new IllegalStateException("gRPC stream is not ready");
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready"));
status = OpenSergoClientStatus.INTERRUPTED;
}
SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder()
.setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp())
Expand All @@ -124,8 +177,7 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
// Register subscriber to local.
if (subscriber != null) {
subscribeRegistry.registerSubscriber(subscribeKey, subscriber);
OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}",
subscribeKey, subscriber);
OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}", subscribeKey, subscriber);
}

return true;
Expand All @@ -135,4 +187,8 @@ public SubscribedConfigCache getConfigCache() {
return configCache;
}

public SubscribeRegistry getSubscribeRegistry() {
return subscribeRegistry;
}

}
30 changes: 30 additions & 0 deletions src/main/java/io/opensergo/OpenSergoClientStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022, OpenSergo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opensergo;

/**
* @author Jiangnan Jia
**/
public enum OpenSergoClientStatus {

/* initial*/
INITIAL,
STARTING,
STARTED,
INTERRUPTED,
SHUTDOWN

}
22 changes: 11 additions & 11 deletions src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import io.opensergo.proto.transport.v1.SubscribeResponse;
import io.opensergo.subscribe.OpenSergoConfigSubscriber;
import io.opensergo.subscribe.SubscribeKey;
import io.opensergo.subscribe.SubscribeRegistry;
import io.opensergo.subscribe.SubscribedConfigCache;
import io.opensergo.util.StringUtils;

/**
Expand All @@ -41,13 +39,10 @@ public class OpenSergoSubscribeClientObserver implements ClientResponseObserver<

private ClientCallStreamObserver<SubscribeRequest> requestStream;

private final SubscribedConfigCache configCache;
private final SubscribeRegistry subscribeRegistry;
private OpenSergoClient openSergoClient;

public OpenSergoSubscribeClientObserver(SubscribedConfigCache configCache,
SubscribeRegistry subscribeRegistry) {
this.configCache = configCache;
this.subscribeRegistry = subscribeRegistry;
public OpenSergoSubscribeClientObserver(OpenSergoClient openSergoClient) {
this.openSergoClient = openSergoClient;
}

@Override
Expand All @@ -58,7 +53,7 @@ public void beforeStart(ClientCallStreamObserver<SubscribeRequest> requestStream
private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWithVersion dataWithVersion)
throws Exception {
long receivedVersion = dataWithVersion.getVersion();
SubscribedData cachedData = configCache.getDataFor(subscribeKey);
SubscribedData cachedData = this.openSergoClient.getConfigCache().getDataFor(subscribeKey);
if (cachedData != null && cachedData.getVersion() > receivedVersion) {
// The upcoming data is out-dated, so we'll not resolve the push request.
return new LocalDataNotifyResult().setCode(OpenSergoTransportConstants.CODE_ERROR_VERSION_OUTDATED);
Expand All @@ -67,9 +62,9 @@ private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWi
// Decode actual data from the raw "Any" data.
List<Object> dataList = decodeActualData(subscribeKey.getKind().getKindName(), dataWithVersion.getDataList());
// Update to local config cache.
configCache.updateData(subscribeKey, dataList, receivedVersion);
this.openSergoClient.getConfigCache().updateData(subscribeKey, dataList, receivedVersion);

List<OpenSergoConfigSubscriber> subscribers = subscribeRegistry.getSubscribersOf(subscribeKey);
List<OpenSergoConfigSubscriber> subscribers = this.openSergoClient.getSubscribeRegistry().getSubscribersOf(subscribeKey);
if (subscribers == null || subscribers.isEmpty()) {
// no-subscriber is acceptable (just for cache-and-pull mode)
return LocalDataNotifyResult.withSuccess(dataList);
Expand Down Expand Up @@ -178,6 +173,11 @@ private List<Object> decodeActualData(String kind, List<Any> rawList) throws Exc

@Override
public void onError(Throwable t) {
// TODO add handles for different io.grpc.Status of Throwable from ClientCallStreamObserver<SubscribeRequest>
io.grpc.Status.Code errorCode = io.grpc.Status.fromThrowable(t).getCode();
if(errorCode.equals(io.grpc.Status.UNAVAILABLE.getCode())) {
this.openSergoClient.status = OpenSergoClientStatus.INTERRUPTED;
}
OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", t);
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/opensergo/subscribe/SubscribeRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.opensergo.subscribe;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -33,9 +34,14 @@ public void registerSubscriber(SubscribeKey key, OpenSergoConfigSubscriber subsc
AssertUtils.assertNotNull(key, "subscribeKey cannot be null");
AssertUtils.assertNotNull(subscriber, "subscriber cannot be null");
List<OpenSergoConfigSubscriber> list = subscriberMap.computeIfAbsent(key, v -> new CopyOnWriteArrayList<>());
// TODO distinct the same OpenSergoConfigSubscriber
list.add(subscriber);
}

public Set<SubscribeKey> getSubscriberKeysAll() {
return subscriberMap.keySet();
}

public List<OpenSergoConfigSubscriber> getSubscribersOf(SubscribeKey key) {
if (key == null) {
return null;
Expand Down