Skip to content
23 changes: 21 additions & 2 deletions ncoap-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
</repository>
</repositories>

<properties>
<netty.version>4.0.37.Final</netty.version>
</properties>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
Expand All @@ -34,8 +38,23 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.9.0.Final</version>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,19 @@
*/
package de.uzl.itm.ncoap.application;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import de.uzl.itm.ncoap.communication.AbstractCoapChannelHandler;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;

import java.net.InetSocketAddress;
import java.util.concurrent.*;

import de.uzl.itm.ncoap.communication.AbstractCoapChannelHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.DefaultThreadFactory;

/**
* The abstract base class for all kinds of CoAP applications, i.e. clients, servers, and endpoints (combining client
Expand All @@ -59,7 +56,7 @@ public abstract class AbstractCoapApplication {
*/
public static final int NOT_BOUND = -1;

private ScheduledThreadPoolExecutor executor;
private EventLoopGroup executor;
private DatagramChannel channel;
private String applicationName;

Expand All @@ -73,20 +70,9 @@ protected AbstractCoapApplication(String applicationName) {

this.applicationName = applicationName;

ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat(applicationName + " I/O Worker #%d").build();

ThreadRenamingRunnable.setThreadNameDeterminer(new ThreadNameDeterminer() {
@Override
public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
return null;
}
});

// determine number of I/O threads and create thread pool executor of that size
int ioThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2, 4);
this.executor = new ScheduledThreadPoolExecutor(ioThreads, threadFactory);
// this.executor = new SynchronizedExecutor(ioThreads, threadFactory);
this.executor = new NioEventLoopGroup(ioThreads, new DefaultThreadFactory(applicationName + "I/O Worker"));
}

/**
Expand All @@ -97,40 +83,28 @@ public String determineThreadName(String currentThreadName, String proposedThrea
* @param localSocket the socket address to be used for inbound and outbound messages
*/
protected void startApplication(CoapChannelPipelineFactory pipelineFactory, InetSocketAddress localSocket) {
//ChannelFactory channelFactory = new NioDatagramChannelFactory(executor, executor.getCorePoolSize() / 2 );
ChannelFactory channelFactory = new NioDatagramChannelFactory(executor, 1 );

//System.out.println("Threads: " + (executor.getCorePoolSize() - 1));
//Create and configure bootstrap
ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(channelFactory);
bootstrap.setPipelineFactory(pipelineFactory);
bootstrap.setOption("receiveBufferSizePredictor",
new FixedReceiveBufferSizePredictor(RECEIVE_BUFFER_SIZE));
Bootstrap bootstrap = new Bootstrap()
.channel(NioDatagramChannel.class)
.group(executor)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
.handler(pipelineFactory);

//Create datagram channel
this.channel = (DatagramChannel) bootstrap.bind(localSocket);

// set the channel handler contexts
for (ChannelHandler handler : pipelineFactory.getChannelHandlers()) {
if (handler instanceof AbstractCoapChannelHandler) {
ChannelHandlerContext context = this.channel.getPipeline().getContext(handler.getClass());
((AbstractCoapChannelHandler) handler).setContext(context);
}
}
this.channel = (DatagramChannel) bootstrap.bind(localSocket).awaitUninterruptibly().channel();
}

/**
* Returns the local port number the {@link org.jboss.netty.channel.socket.DatagramChannel} of this
* Returns the local port number the {@link DatagramChannel} of this
* {@link de.uzl.itm.ncoap.application.client.CoapClient} is bound to or
* {@link #NOT_BOUND} if the application has not yet been started.
*
* @return the local port number the {@link org.jboss.netty.channel.socket.DatagramChannel} of this
* @return the local port number the {@link DatagramChannel} of this
* {@link de.uzl.itm.ncoap.application.client.CoapClient} is bound to or
* {@link #NOT_BOUND} if the application has not yet been started.
*/
public int getPort() {
try {
return this.channel.getLocalAddress().getPort();
return this.channel.localAddress().getPort();
} catch(Exception ex) {
return NOT_BOUND;
}
Expand All @@ -147,7 +121,7 @@ public int getPort() {
* {@link de.uzl.itm.ncoap.application.AbstractCoapApplication} to handle tasks, e.g. write and
* receive messages.
*/
public ScheduledExecutorService getExecutor() {
public EventLoopGroup getExecutor() {
return this.executor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,34 @@
*/
package de.uzl.itm.ncoap.application;

import de.uzl.itm.ncoap.communication.codec.CoapMessageDecoder;
import de.uzl.itm.ncoap.communication.codec.CoapMessageEncoder;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.execution.ExecutionHandler;
import java.util.LinkedHashSet;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import de.uzl.itm.ncoap.communication.codec.CoapMessageDecoder;
import de.uzl.itm.ncoap.communication.codec.CoapMessageEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;

/**
* Abstract base class for pipeline factories for clients, servers and peers.
*
* @author Oliver Kleine
*/
public abstract class CoapChannelPipelineFactory implements ChannelPipelineFactory {
public abstract class CoapChannelPipelineFactory extends ChannelInitializer<Channel> {

private static Logger LOG = LoggerFactory.getLogger(CoapChannelPipelineFactory.class.getName());

private Set<ChannelHandler> channelHandlers;


protected CoapChannelPipelineFactory(ScheduledExecutorService executor) {
protected CoapChannelPipelineFactory() {
this.channelHandlers = new LinkedHashSet<>();

addChannelHandler(new ExecutionHandler(executor));
addChannelHandler(new CoapMessageEncoder());
addChannelHandler(new CoapMessageDecoder());
}
Expand All @@ -67,16 +66,15 @@ public Set<ChannelHandler> getChannelHandlers () {
}

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
protected void initChannel(Channel channel) throws Exception
{
ChannelPipeline pipeline = channel.pipeline();

for(ChannelHandler handler : this.channelHandlers) {
String handlerName = handler.getClass().getSimpleName();
pipeline.addLast(handler.getClass().getSimpleName(), handler);
LOG.debug("Added Handler to Pipeline: {}.", handlerName);
}

return pipeline;
}

// /**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
*/
package de.uzl.itm.ncoap.application.client;

import java.net.InetSocketAddress;

import de.uzl.itm.ncoap.communication.blockwise.BlockSize;
import de.uzl.itm.ncoap.message.CoapResponse;
import org.jboss.netty.buffer.ChannelBuffer;
import de.uzl.itm.ncoap.message.options.Option;

import java.net.InetSocketAddress;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
*/
package de.uzl.itm.ncoap.application.client;

import java.util.concurrent.ScheduledExecutorService;

import de.uzl.itm.ncoap.application.CoapChannelPipelineFactory;
import de.uzl.itm.ncoap.communication.blockwise.client.ClientBlock1Handler;
import de.uzl.itm.ncoap.communication.blockwise.client.ClientBlock2Handler;
Expand All @@ -34,10 +36,8 @@
import de.uzl.itm.ncoap.communication.reliability.inbound.ClientInboundReliabilityHandler;
import de.uzl.itm.ncoap.communication.reliability.outbound.ClientOutboundReliabilityHandler;
import de.uzl.itm.ncoap.communication.reliability.outbound.MessageIDFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.DatagramChannel;

import java.util.concurrent.ScheduledExecutorService;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.DatagramChannel;


/**
Expand All @@ -56,7 +56,7 @@ public class ClientChannelPipelineFactory extends CoapChannelPipelineFactory {
*/
public ClientChannelPipelineFactory(ScheduledExecutorService executor) {

super(executor);
super();
addChannelHandler(new ClientIdentificationHandler(executor));
addChannelHandler(new ClientOutboundReliabilityHandler(executor, new MessageIDFactory(executor)));
addChannelHandler(new ClientInboundReliabilityHandler(executor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@
*/
package de.uzl.itm.ncoap.application.client;

import java.net.InetSocketAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import de.uzl.itm.ncoap.application.AbstractCoapApplication;
import de.uzl.itm.ncoap.communication.blockwise.BlockSize;
import de.uzl.itm.ncoap.communication.dispatching.client.ResponseDispatcher;
import de.uzl.itm.ncoap.message.CoapMessage;
import de.uzl.itm.ncoap.message.CoapRequest;
import de.uzl.itm.ncoap.message.CoapResponse;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.DatagramChannel;

/**
* An instance of {@link CoapClient} is the entry point to send {@link CoapMessage}s to a (remote)
Expand Down Expand Up @@ -98,7 +99,7 @@ public CoapClient(String name, InetSocketAddress clientSocket) {
ClientChannelPipelineFactory factory = new ClientChannelPipelineFactory(this.getExecutor());
startApplication(factory, clientSocket);

this.responseDispatcher = getChannel().getPipeline().get(ResponseDispatcher.class);
this.responseDispatcher = getChannel().pipeline().get(ResponseDispatcher.class);
}


Expand Down Expand Up @@ -150,8 +151,8 @@ public void sendCoapPing(InetSocketAddress remoteSocket, ClientCallback callback

/**
* Shuts this {@link CoapClient} down by closing its
* {@link org.jboss.netty.channel.socket.DatagramChannel} which includes to unbind
* this {@link org.jboss.netty.channel.socket.DatagramChannel} from the listening port and by this means free the
* {@link DatagramChannel} which includes to unbind
* this {@link DatagramChannel} from the listening port and by this means free the
* port.
*/
public final void shutdown() {
Expand All @@ -161,10 +162,10 @@ public final void shutdown() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
LOG.warn("Channel closed ({}).", CoapClient.this.getApplicationName());
getChannel().getFactory().releaseExternalResources();
LOG.warn("External resources released ({}).", CoapClient.this.getApplicationName());
LOG.warn("Shutdown of " + getApplicationName() + " completed.");
}
});
getExecutor().shutdownGracefully();
}
}
Loading