Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<version>1.0.0</version>
<modules>
<module>csv-file-adapter</module>
<module>redis-adapter</module>
</modules>

<properties>
Expand Down
26 changes: 26 additions & 0 deletions redis-adapter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gateway-examples</artifactId>
<groupId>com.diffusiondata.gateway.adapter</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>redis-adapter</artifactId>
<packaging>pom</packaging>
<modules>
<module>redis-sink</module>
<module>redis-source</module>
</modules>

<dependencies>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.2.1.RELEASE</version>
</dependency>
</dependencies>
</project>
29 changes: 29 additions & 0 deletions redis-adapter/redis-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Redis Sink Adapter

## Introduction
This adapter can be used to get updates from Diffusion JSON topics and write the update to a Redis instance. It supports only one type of service.

### REDIS_SINK
This sink service supports getting Diffusion JSON topic updates and publishing them to a Redis instance specified in the configuration. This service will correctly function only for Diffusion topics of JSON topic type. If any other type of topic selector is used in its configuration, or the topic selector matches any non JSON topic type, when an update is received for this topic, Payload convertor exception will be thrown. This service type requires the following configuration to be declared in each defined service in the configuration file:

"application": {
"redisUrl": "redisUrl"
}

Below is an example of an overall configuration of a service of type `REDIS_SINK`:

{
"serviceName": "dataSelectorSink",
"serviceType": "REDIS_SINK",
"description": "Subscribes to JSON diffusion topic and writes its content to Redis instance",
"config": {
"framework": {
"diffusionTopicSelector": "?data//"
},
"application": {
"redisUrl": "redis://password@localhost:6379/"
}
}
}

With this configuration, this service will subscribe to all topics that match the selectors passed in `diffusionTopicSelectors` field. For each topic path update, a Redis entry will be created with tthe topic path as the key and the content, in the form a JSON string, as the value.
35 changes: 35 additions & 0 deletions redis-adapter/redis-sink/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>redis-adapter</artifactId>
<groupId>com.diffusiondata.gateway.adapter</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>redis-sink</artifactId>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.diffusiondata.gateway.adapter.redis.sink.Runner</mainClass>
</transformer>
</transformers>
<finalName>redis-sink-${project.version}</finalName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.diffusiondata.gateway.adapter.redis.sink;

import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newApplicationDetailsBuilder;

import com.diffusiondata.gateway.framework.GatewayApplication;
import com.diffusiondata.gateway.framework.ServiceDefinition;
import com.diffusiondata.gateway.framework.ServiceMode;
import com.diffusiondata.gateway.framework.StateHandler;
import com.diffusiondata.gateway.framework.exceptions.ApplicationConfigurationException;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Main Gateway Application implementation for Redis sink application.
*
* @author Diffusion Data
*/
final public class RedisSinkApplication implements GatewayApplication {

@Override public ApplicationDetails getApplicationDetails()
throws ApplicationConfigurationException {
return newApplicationDetailsBuilder()
.addServiceType(
"REDIS_SINK",
ServiceMode.SINK,
"A sink service which writes received string updates from " +
"configured Diffusion topics to a Redis instance",
"{\n" +
" \"$schema\": \"http://json-schema" +
".org/draft-07/schema#\",\n" +
" \"$ref\": \"#/definitions/application\",\n" +
" \"definitions\": {\n" +
" \"application\": {\n" +
" \"type\": \"object\",\n" +
" \"additionalProperties\": false,\n" +
" \"properties\": {\n" +
" \"redisUrl\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" },\n" +
" \"required\": [\n" +
" \"redisUrl\"\n" +
" ]\n" +
" }\n" +
" }\n" +
"}")
.build("REDIS_SINK", 1);
}

@Override
public RedisSinkHandler addSink(
ServiceDefinition serviceDefinition,
StateHandler stateHandler) {

final Map<String, Object> parameters =
serviceDefinition.getParameters();

final String redisUrl = (String) parameters.get("redisUrl");

return new RedisSinkHandler(redisUrl);
}

@Override public CompletableFuture<?> stop() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.diffusiondata.gateway.adapter.redis.sink;

import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newSinkServicePropertiesBuilder;
import static java.util.concurrent.CompletableFuture.completedFuture;

import com.diffusiondata.gateway.framework.SinkHandler;
import com.diffusiondata.gateway.framework.TopicType;
import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;

import java.util.concurrent.CompletableFuture;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.async.RedisAsyncCommands;
import net.jcip.annotations.Immutable;

/**
* Redis sink handler to write received string update into a Redis instance.
*
* @author Diffusion Data
*/
@Immutable
final class RedisSinkHandler implements SinkHandler<String> {

private final RedisAsyncCommands<String, String> redisClient;

public RedisSinkHandler(String redisUrl) {
redisClient = RedisClient
.create(redisUrl)
.connect()
.async();
}

@Override
public SinkServiceProperties getSinkServiceProperties()
throws InvalidConfigurationException {
return newSinkServicePropertiesBuilder()
.topicType(TopicType.JSON)
.payloadConvertorName("$Default_JSON")
.build();
}

@Override
public CompletableFuture<?> update(String diffusionTopic, String value) {
return redisClient.set(diffusionTopic, value).toCompletableFuture();
}

@Override
public CompletableFuture<?> pause(PauseReason pauseReason) {
return completedFuture(null);
}

@Override
public CompletableFuture<?> resume(ResumeReason resumeReason) {
return completedFuture(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.diffusiondata.gateway.adapter.redis.sink;

import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.initialize;

/**
* Main Runner class.
*
* @author Diffusion Data
*/
public class Runner {
public static void main(String[] args) {
final RedisSinkApplication redisSinkApplication =
new RedisSinkApplication();

initialize(redisSinkApplication)
.connect();


}
}
26 changes: 26 additions & 0 deletions redis-adapter/redis-sink/src/main/resources/configuration.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"id": "redis-adapter-1",
"framework-version": 1,
"application-version": 1,
"diffusion": {
"url": "ws://localhost:8080",
"principal": "admin",
"password": "password",
"reconnectIntervalMs": 5000
},
"services": [
{
"serviceName": "dataSelectorSink",
"serviceType": "REDIS_SINK",
"description": "Subscribes to 'data' diffusion topic selector and stores its content to a Redis instance",
"config": {
"framework": {
"diffusionTopicSelector": "?data//"
},
"application": {
"redisUrl": "redis://password@localhost:6379/"
}
}
}
]
}
27 changes: 27 additions & 0 deletions redis-adapter/redis-source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Redis Source Adapter

## Introduction
This adapter can be used to read from a Redis instance and publish its contents to a Diffusion topic. It supports two types of service.
<ol>
<li> POLLING_REDIS_SOURCE

A service of this type is called by the framework in the interval set in the configuration file or at a default interval of 30 secs, if not specified in the configuration. This service type requires the following configuration to be declared in each defined service in the configuration file:

"application":
{
"redisUrl": <redisUrl>,
"diffusionTopicName": <diffusionTopicToPublishDataTo>
}
</li>
<li>
STREAMING_REDIS_SOURCE

A service of this type is used to subscribe to a Redis channel. When a message is received, the application will publish the value to a Diffusion topic using the channel key as the topic path. This service type requires the following configuration to be declared for each defined service in the configuration file:

"application":
{
"redisUrl": <redisUrl>,
"diffusionTopicName": <diffusionTopicToPublishDataTo>
}
</li>
</ol>
36 changes: 36 additions & 0 deletions redis-adapter/redis-source/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>redis-adapter</artifactId>
<groupId>com.diffusiondata.gateway.adapter</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>redis-source</artifactId>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.diffusiondata.gateway.adapter.redis.source.Runner</mainClass>
</transformer>
</transformers>
<finalName>redis-source-${project.version}</finalName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Loading