Skip to content

Develop #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules/*
example/data/RawData/*
example/data/StatData/*
mappings/tmp/*
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
# Time Series server

The Live Time Series Server is an ongoing implementation that aims on providing a cost efficient interface for
Open Stream data publishing. Through an extensible modular architecture we allow data publishers to define
[multidimensional interfaces](http://ceur-ws.org/Vol-1666/paper-03.pdf) to provide query answering functionalities on top of their data.
The Live Time Series Server is an ongoing implementation that aims on providing a cost efficient interface for Open Stream data publishing. Through an extensible modular architecture we allow data publishers to define [multidimensional interfaces](http://ceur-ws.org/Vol-1666/paper-03.pdf) to provide query answering functionalities on top of their data.

![Server Architecture](https://linkedtimeseries.github.io/timeseries-demo-paper/media/images/fig1.png)

Features:

* Allows to define custom interfaces to publish pre-processed summaries of the data.
* Modular and extensible architecture for implementig new features.
* Keeps and publishes history over HTTP using appropriate caching headers.
* Can expose HTTP and Websocket interfaces for communication with clients.
* Allows to define custom interfaces to publish pre-processed summaries of the data.
* Modular and extensible architecture for implementing new features.
* Keeps and publishes history over HTTP using appropriate caching headers.
* Can expose HTTP and Websocket interfaces for communication with clients.

A more detailed description can be found on this [demo paper](https://linkedtimeseries.github.io/timeseries-demo-paper/).

Expand Down Expand Up @@ -41,12 +39,14 @@ multidimensional interfaces (`RawData`, `StatisticalAverage` and `GeographicClas
```

### RawData

This is the default interface for the server as it takes the received stream updates and exposes them without any modification.
This interface allows also to store historic data as [Linked Data fragments](http://linkeddatafragments.org/). For this example
the data is exposed through HTTP using `/RawData/latest` URL for the most updated data and `/RawData/fragments{?time}` URL
to access historic data. It also exposes an optional Websocket interface to push the latest updates to subscribed clients.
Each interface can define its configuration parameters according to their needs. for this specific implementation these are
the defined parameters:

```js
{
"name": "RawData", // Interface name. Used to define the HTTP URLs.
Expand All @@ -58,9 +58,12 @@ the defined parameters:
"maxFileSize": 100000 // Maximun size in Bytes of each historic data fragment.
}
```

### StatisticalAverage
This interface serves as an example of exposing precalculated values from the original data of the stream. On this concrete implementation we expose the arithmetic mean of the parking availability of each defined parking lot in the stream. The data can be accesed on different levels that follow a time-based dimension (i.e. `year`, `month`, `day` and `hour`). Each time a new data update is received, the servers proceeds to re-calculate the arithmetic mean values and updates them on each level. The server also adds metadata using the [Hydra](http://www.hydra-cg.com/spec/latest/core/) and [Multidimensional Interface](http://semweb.datasciencelab.be/ns/multidimensional-interface/#RangeGate) vocabularies to link the different levels together. Next there is a snippet example of the data that can be retrieved at a `month` level using the `TriG` format:
```

This interface serves as an example of exposing precalculated values from the original data of the stream. On this concrete implementation we expose the arithmetic mean of the parking availability of each defined parking lot in the stream. The data can be accessed on different levels that follow a time-based dimension (i.e. `year`, `month`, `day` and `hour`). Each time a new data update is received, the servers proceeds to re-calculate the arithmetic mean values and updates them on each level. The server also adds metadata using the [Hydra](http://www.hydra-cg.com/spec/latest/core/) and [Multidimensional Interface](http://semweb.datasciencelab.be/ns/multidimensional-interface/#RangeGate) vocabularies to link the different levels together. Next there is a snippet example of the data that can be retrieved at a `month` level using the `TriG` format:

```trig
<http://localhost:8080/StatisticalAverage/fragment/2018_2019/02_03> {
<https://stad.gent/id/parking/P7> ts:mean "348".
<https://stad.gent/id/parking/P10> ts:mean "635".
Expand All @@ -82,18 +85,17 @@ This interface serves as an example of exposing precalculated values from the or
<http://localhost:8080/StatisticalAverage/fragment/2018_2019/03_04#mapping> hydra:variable "initial", "final";
hydra:property mdi:initial, mdi:final.
```

### GeograhicalClassification
Not implemented yet.
## Test it
To test the server a RDF stream can be piped into the server. We can pipe the example dataset into the server using the [replay-timeseries](https://www.npmjs.com/package/replay-timeseries)
tool, which allows to control the frequency of the updates. Follow the next steps to test the server after installation:
To test the server a RDF stream can be piped into the server. We can pipe the example dataset into the server using the [replay-timeseries](https://www.npmjs.com/package/replay-timeseries) tool, which allows to control the frequency of the updates. Follow the next steps to test the server after installation:
```bash
$ cd timeseries-server
$ npm install -g replay-timeseries
$ cat parking_data.trig | replay-timeseries -s 10x | node bin/timeseries-server.js -c config.json
```
As the original observations were made every 30 seconds, we use `replay-timeseries -s 10x` to replay them every 3 seconds
(10 times faster). This tool also rewrites the `prov:generatedAtTime` value to the current time for testing purposes.
As the original observations were made every 30 seconds, we use `replay-timeseries -s 10x` to replay them every 3 seconds (10 times faster). This tool also rewrites the `prov:generatedAtTime` value to the current time for testing purposes.
### HTTP Interfaces
To access the data you can use a polling approach through HTTP as follows:
#### RawData
Expand All @@ -116,7 +118,7 @@ Each level contains [Hydra](http://www.hydra-cg.com/spec/latest/core/) metadata
To access the data through Websockets you can execute the example Websocket [client](https://github.com/linkedtimeseries/timeseries-server/blob/master/lib/WebSocketClient.js)
provided on this implementation as follows:
```bash
$ cd timeseries-server
GeograhicalClassification$ cd timeseries-server
$ node lib/WebSocketClient.js
```
It subscribes to both the Websocket channels defined by the `RawInterface` and the `StatisticalAverage` interface. It will print on console the data it receives from the server.
Expand Down
42 changes: 27 additions & 15 deletions bin/timeseries-server.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,50 @@
const CommunicationManager = require('../lib/CommunicationManager');
const Configuration = require('../lib/Configuration');
const SourceReader = require('../lib/SourceReader');
const DataEventManager = require('../lib/DataEventManager');


try {
// Read config file
let config = Configuration.getConfig(process.argv);
// Init Communication Manager module
let commManager = new CommunicationManager(config);

// Load multidimensional interfaces
loadInterfaceModules(config.interfaces, commManager);
// Process data source
for (let i in config.sources) {

// Load multidimensional interfaces
loadInterfaceModules(config.sources[i], commManager);

let source = new SourceReader(config.sources[i], config.hostName + config.liveUriPath);
source.on('data', data => {
console.log(`${new Date().toISOString()} – data mapped from ${config.sources[i].name}`);
// Launch data event towards predefined interfaces through Data Event Manager module
DataEventManager.push(`data-${config.sources[i].name}`, data);
});
}

//Listen for data on standard input
let stdin = process.openStdin();
stdin.on('data', chunk => {
// Launch data event towards predifined interfaces through Data Event Manager module
DataEventManager.push('data', chunk);
});
// TODO: Define a way to configure RDF input streams
// Listen for data on standard input
// let stdin = process.openStdin();
// stdin.on('data', chunk => writePerObservation(chunk));

// Launch Web server for polling interfaces
let app = commManager.app;
let router = commManager.router;
app.use(router.routes()).use(router.allowedMethods());
let http = commManager.http;
let ws = commManager.ws;
app.use(http.routes()).use(http.allowedMethods());
app.ws.use(ws.routes()).use(ws.allowedMethods());
app.listen(config.httpPort);

} catch (e) {
console.error(e);
process.exit(1);
}

function loadInterfaceModules(interfaces, commManager) {
for (let i in interfaces) {
let Interface = require(interfaces[i].path);
new Interface(interfaces[i], commManager);
function loadInterfaceModules(source, commManager) {
let int = Object.keys(source.interfaces);
for (let i in int) {
let Interface = require(process.cwd() + '/' + source.interfaces[int[i]]);
new Interface(source, commManager);
}
}
89 changes: 70 additions & 19 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,28 +1,79 @@
{
"serverUrl": "http://localhost:8080/",
"hostName": "http://localhost:8080",
"httpPort": 8080,
"interfaces": [
"liveUriPath": "/data/live/",
"sources": [
{
"name": "RawData",
"path": "../lib/interfaces/RawData",
"websocket": true,
"wsPort": 3001,
"fragmentsPath": "./example/data/RawData",
"staticTriples": "./example/data/rawdata_static.trig",
"maxFileSize": 100000
"name": "kortrijk",
"sourceUrl": "https://stallingsnet.nl/api/3/parkingcount/kortrijk?apiKey=NzgxOlduaEFfU0t2Z2FicDRKaWh0bXVzNTFUN3pKUT0=",
"interfaces": {
"RawData": "lib/interfaces/RawData"
},
"outputPath": "/home/julian/data/velopark/live",
"refreshInterval": "*/10 * * * * *",
"staticTriples": "",
"metadata": "",
"maxFileSize": 5000,
"mappings": "./mappings/kortrijk.yml",
"context": "./mappings/context.json",
"idAlignment": {
"apiPath": "$..id",
"align": [
{
"value": "20-1",
"@id": "https://velopark.ilabt.imec.be/data/Parko_1#section1"
}
]
}
},
{
"name": "StatisticalAverage",
"path": "../lib/interfaces/StatisticalAverage",
"websocket": true,
"wsPort": 3002,
"fragmentsPath": "./example/data/StatData",
"staticTriples": "./example/data/statsdata_static.trig"
"name": "gent",
"sourceUrl": "https://data.stad.gent/api/records/1.0/search/?dataset=real-time-bezettingen-fietsenstallingen-gent",
"interfaces": {
"RawData": "lib/interfaces/RawData"
},
"outputPath": "/home/julian/data/velopark/live",
"refreshInterval": "*/5 * * * * *",
"metadata": "",
"maxFileSize": 5000,
"mappings": "mappings/gent.yml",
"context": "mappings/context.json",
"idAlignment": {
"apiPath": "$..id",
"align": [
{
"value": "48-1",
"@id": "https://velopark.ilabt.imec.be/data/De-Fietsambassade-Gent_Korenmarkt#section1"
},
{
"value": "48-2",
"@id": "https://velopark.ilabt.imec.be/data/De-Fietsambassade-Gent_Emile-Braun-Plein#section1"
}
]
}
},
{
"name": "GeographicClassification",
"path": "../lib/interfaces/GeographicClassification",
"websocket": false
"name": "vilvoorde",
"sourceUrl": "https://api.fietskluis-app.nl/public-api/locations/8b842ad4-6193-42fc-b1ee-591226fb2ce4",
"interfaces": {
"RawData": "lib/interfaces/RawData"
},
"outputPath": "/home/julian/data/velopark/live",
"refreshInterval": "*/15 * * * * *",
"staticTriples": "",
"metadata": "",
"maxFileSize": 5000,
"mappings": "mappings/vilvoorde.yml",
"context": "mappings/context.json",
"idAlignment": {
"apiPath": "$..id",
"align": [
{
"value": "8b842ad4-6193-42fc-b1ee-591226fb2ce4",
"@id": "https://velopark.ilabt.imec.be/data/Blue-Bike_bluelockervilvoorde#section1"
}
]
}
}
]
}
}
35 changes: 10 additions & 25 deletions lib/CommunicationManager.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
const Koa = require('koa');
const Router = require('koa-router');
const WS = require('ws');
const WS = require('koa-websocket');

class CommunicationManager {
constructor(config) {
this._config = config;
this._app = new Koa();
this._router = new Router();
this._webSockets = new Map();
}

setupWebSocket(name, port) {
let websocket = new WS.Server({ port: port });
this.webSockets.set(name, websocket);
}

pushData(name, data) {
if (this.webSockets.has(name)) {
this.webSockets.get(name).clients.forEach((client) => {
//Check if the connection is open
if (client.readyState === WS.OPEN) {
client.send(data);
}
});
} else {
throw new Error('There is no WebSocket interface defined for ' + name);
}
this._http = new Router();
// Websockify the app
this._app = WS(this.app);
// New router to handle websocket routes
this._ws = new Router();
}

get config() {
Expand All @@ -36,12 +21,12 @@ class CommunicationManager {
return this._app;
}

get router() {
return this._router;
get http() {
return this._http;
}

get webSockets() {
return this._webSockets;
get ws() {
return this._ws;
}
}

Expand Down
12 changes: 6 additions & 6 deletions lib/MultidimensionalInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ const DataEventManager = require('./DataEventManager');

class MultidimensionalInterface {

constructor(commMan) {
constructor(commMan, source) {
// Communication Manager object
this._commMan = commMan;
// Latest piece of data obtained
this._latestData = null;
// Subscribe to 'data' events
DataEventManager.subscribe('data', (...data) => this.onData(data));
// Subscribe to 'data' events its particular source
DataEventManager.subscribe(`data-${source}`, (...data) => this.onData(data));

}

// Abstract methods to be overriden by interface implementations
onData(data) {}
// Abstract methods to be overridden by interface implementations
async conData(data) {}

setupPollInterfaces() {}

// Websocket interface creator
setupPubsupInterface(name, port) {
setupPubSubInterface(name, port) {
this.commMan.setupWebSocket(name, port);
}

Expand Down
31 changes: 31 additions & 0 deletions lib/RMLMapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const RMLMapperWrapper = require('@rmlio/rmlmapper-java-wrapper');
const yarrrml = require('@rmlio/yarrrml-parser/lib/rml-generator');
const fs = require('fs');
const Utils = require('./Utils');

const rmlmapperPath = './mappings/rmlmapper.jar';
const tempFolderPath = './mappings/tmp';
const wrapper = new RMLMapperWrapper(rmlmapperPath, tempFolderPath, true);

async function yarrrml2rml(yml) {
try {
let quads = new yarrrml().convert(fs.readFileSync(yml, 'utf-8'));
return await Utils.quads2String(quads, 'application/trig');
} catch(err) {
console.error(err);
}
}

async function map(rml, source) {

const sources = {
'data.json': source
};

return await wrapper.execute(rml, {sources, generateMetadata: false});
}

module.exports = {
yarrrml2rml: yarrrml2rml,
map: map
};
Loading