Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ log
.classpath
.project
.metadata
RemoteSystemsTempFiles
RemoteSystemsTempFiles
harpc.iml
4 changes: 2 additions & 2 deletions java/harpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static List<ServerNode> transfer(String address) {
String[] hostname = address.split(":");
int weight = DEFAULT_WEIGHT;
if (hostname.length == 3) {
weight = Integer.valueOf(hostname[2]);
weight = Integer.parseInt(hostname[2]);
}
String ip = hostname[0];
Integer port = Integer.valueOf(hostname[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ResourceUtils {
private static final Object SYS_OBJECT = new Object();

/** {@link ClassLoader} */
private static ClassLoader classLoader;
private static volatile ClassLoader classLoader;

/**
* 从项目,jar或文件系统中读取指定路径的文件<br />
Expand Down Expand Up @@ -150,23 +150,26 @@ public static ClassLoader getDefaultClassLoader() {
}

synchronized (SYS_OBJECT) {
if (classLoader == null) {
ClassLoader tempClassLoader = null;
try {
tempClassLoader = Thread.currentThread().getContextClassLoader();
} catch (Exception ex) {
// Cannot access thread context ClassLoader - falling back
// to system class loader...
}

if (tempClassLoader == null) {
// No thread context class loader -> use class loader of
// this class.
tempClassLoader = ResourceUtils.class.getClassLoader();
}

classLoader = tempClassLoader;
if (classLoader != null) {
return classLoader;
}

ClassLoader tempClassLoader = null;
try {
tempClassLoader = Thread.currentThread().getContextClassLoader();
} catch (Exception ex) {
// Cannot access thread context ClassLoader - falling back
// to system class loader...
}

if (tempClassLoader == null) {
// No thread context class loader -> use class loader of
// this class.
tempClassLoader = ResourceUtils.class.getClassLoader();
}

classLoader = tempClassLoader;

}

return classLoader;
Expand Down
20 changes: 15 additions & 5 deletions java/harpc/src/main/java/com/bfd/harpc/config/RegistryConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.bfd.harpc.RpcException;
import com.bfd.harpc.common.Constants;

import java.io.UnsupportedEncodingException;

/**
* 注册中心配置
* <p>
Expand Down Expand Up @@ -96,14 +98,22 @@ private CuratorFramework create() throws RpcException {
* @param namespace
* @return {@link CuratorFramework}
*/
private CuratorFramework create(String connectString, Integer sessionTimeout, String namespace, int retry) {
private CuratorFramework create(String connectString, Integer sessionTimeout, String namespace, int retry){
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
if (StringUtils.isNotEmpty(auth)) {
builder.authorization("digest", auth.getBytes());
try {
builder.authorization("digest", auth.getBytes("utf-8"));
} catch (UnsupportedEncodingException e) {
//Assert never throw
e.printStackTrace();
}
}
return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(sessionTimeout).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(
1000,
retry)).defaultData(null).build();
return builder.connectString(connectString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(sessionTimeout)
.namespace(namespace)
.retryPolicy(new ExponentialBackoffRetry(1000,retry))
.defaultData(null).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ public Object createProxy() throws Exception {
if (applicationContext != null) {
Map<String, RegistryConfig> regMap = applicationContext.getBeansOfType(RegistryConfig.class);
if (regMap != null && regMap.size() > 0) {

for(RegistryConfig config:regMap.values()){
if ( config != null ){
try {
zkClient = config.obtainZkClient();
registry = new ZkClientRegistry(getService(), zkClient, clientNode);
} catch (Exception e) {
throw new RpcException("Registry error!", e);
}
break;
}
}
/* 改进一些微小的性能
for (String key : regMap.keySet()) {
if (regMap.get(key) != null) {
try {
Expand All @@ -115,7 +128,7 @@ public Object createProxy() throws Exception {
}
break;
}
}
} */
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ public void export() throws ClassNotFoundException, RpcException {
if (applicationContext != null) {
Map<String, RegistryConfig> regMap = applicationContext.getBeansOfType(RegistryConfig.class);
if (regMap != null && regMap.size() > 0) {
for(RegistryConfig config : regMap.values()){
if ( config != null ){
try {
zkClient = config.obtainZkClient();
auth = config.getAuth();
if (StringUtils.isEmpty(auth)) {
throw new RpcException(RpcException.CONFIG_EXCEPTION, "The params 'auth' cannot empty!");
}
} catch (Exception e) {
throw new RpcException("Registry error!", e);
}
break;
}
}
/* 一些小的性能改进
for (String key : regMap.keySet()) {
if (regMap.get(key) != null) {
try {
Expand All @@ -117,6 +132,7 @@ public void export() throws ClassNotFoundException, RpcException {
break;
}
}
*/
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ private void sendToZookeeper(ServerNode serverNode, StatisticsInfo info, boolean
statMap.get(serverNode).setTotal(info);
} else {
statMap.putIfAbsent(serverNode, new StatisticsTotal());
Map<String, StatisticsInfo> map = new HashMap<String, StatisticsInfo>();
map.put(timeStamp, info);

statMap.get(serverNode).getDetail().add(info);
// 调整节点数目
StatisticsHelper.adjustNodesByLimit(statMap.get(serverNode).getDetail(), NODE_COUNT_LIMIT);
Expand All @@ -213,9 +212,9 @@ private void sendToZookeeper(ServerNode serverNode, StatisticsInfo info, boolean
try {
// 创建节点并添加信息
if (zkClient.checkExists().forPath(path) != null) {
zkClient.setData().forPath(path, jsonString.getBytes());
zkClient.setData().forPath(path, jsonString.getBytes("utf-8"));
} else {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, jsonString.getBytes());
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, jsonString.getBytes("utf-8"));
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void adjustNodesByLimit(CuratorFramework zookeeper, String parentP
Long min = Long.MAX_VALUE;
for (String s : list) {
if (s.length() > length) {
Long tempValue = new Long(s.substring(length));
Long tempValue = Long.valueOf(s.substring(length));
if (tempValue < min)
min = tempValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public DefaultRegistry(String serverAddress) {
String[] hostnames = serverAddress.split(";");// "ip:port;ip:port"
for (String hostname : hostnames) {
String[] address = hostname.split(":");
hostSet.addServerInstance(new ServerNode(address[0], Integer.valueOf(address[1])));
hostSet.addServerInstance(new ServerNode(address[0], Integer.parseInt(address[1])));
}
}

Expand Down
12 changes: 6 additions & 6 deletions java/harpc/src/test/java/com/bfd/harpc/test/EchoServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public class EchoServiceImpl implements EchoService.Iface {

@Override
public String echo(String msg) throws TException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
return "hello " + msg;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.bfd.harpc.test.performence;

import com.bfd.harpc.config.ClientConfig;
import com.bfd.harpc.config.RegistryConfig;
import com.bfd.harpc.test.gen.EchoService;
import com.bfd.harpc.test.gen.MessageProtocol;
import org.apache.thrift.TException;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Created by xwarrior on 16/6/7.
*/
public class ClientPerformenceTest {
public static void main(String[] args) throws Exception {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setConnectstr("127.0.0.1:2181");
//registryConfig.setConnectstr("172.18.1.22:2181");

String iface = EchoService.Iface.class.getName();
// String iface = MessageProtocol.class.getName();
ClientConfig<MessageProtocol> clientConfig = new ClientConfig<>();
clientConfig.setService("com.bfd.harpc.test$EchoService");
clientConfig.setIface(iface);
clientConfig.setProtocol("thrift");
// clientConfig.setProtocol("avro");
clientConfig.setHeartbeat(2000);
clientConfig.setMonitor(true);
clientConfig.setInterval(60);
clientConfig.setRetry(0);

final EchoService.Iface echo = (EchoService.Iface) clientConfig.createProxy(registryConfig);

long startTime = System.currentTimeMillis();


ThreadPoolExecutor pool = new ThreadPoolExecutor(
50,50,100,TimeUnit.SECONDS,
new LinkedBlockingDeque<>());

final Runnable r = new Runnable() {
@Override
public void run() {
try {
echo.echo("world!");
} catch (TException e) {
e.printStackTrace();
}
}
};

for(int i=0; i < 10000000;i++) {
pool.submit(r);
}

pool.shutdown();
pool.awaitTermination(Integer.MAX_VALUE,TimeUnit.DAYS);

long used = System.currentTimeMillis() - startTime;
System.out.println("10000000.0 times query used millseconds:" + used + ",avg qps:" + ( 10000000.0 / (used / 1000.0 ) ) );

}
}