Skip to content

Commit

Permalink
events implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreaCimminoArriaga committed Sep 16, 2021
1 parent 67b5867 commit fd8a8ef
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 39 deletions.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@
<artifactId>spark-core</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>org.javatuples</groupId>
<artifactId>javatuples</artifactId>
<version>1.2</version>
</dependency>
<!-- DNSN -->
<dependency>
<groupId>org.jmdns</groupId>
<artifactId>jmdns</artifactId>
<version>3.5.7</version>
</dependency>
<!-- Thing description translation -->
<dependency>
<groupId>es.upm.fi.oeg</groupId>
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/directory/Directory.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public static void main(String[] args) {
});
path("/events", () -> {
get("", EventsController.subscribe);
get("/create", EventsController.subscribeCreate);
get("/update", EventsController.subscribeUpdate);
get("/delete", EventsController.subscribeDelete);
});
});

Expand All @@ -112,6 +115,8 @@ public static void main(String[] args) {
String logStr = Utils.buildMessage(request.requestMethod(), " ", request.pathInfo());
Directory.LOGGER.info(logStr);
});


}


Expand Down
20 changes: 20 additions & 0 deletions src/main/java/directory/events/DirectoryEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package directory.events;

public enum DirectoryEvent {

CREATE("create"),
DELETE("delete"),
UPDATE("update"),
ALL("all");
String event;

DirectoryEvent(String event) {
this.event = event;
}

public String getEvent() {
return event;
}


}
108 changes: 108 additions & 0 deletions src/main/java/directory/events/EventSystem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package directory.events;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.javatuples.Triplet;

import directory.Directory;
import directory.Utils;
import info.macias.sse.EventBroadcast;
import info.macias.sse.events.MessageEvent;
import wot.jtd.model.Thing;

public class EventSystem {

private static List<Subscriber> subscriptions = new CopyOnWriteArrayList<>();
private static List<Triplet<MessageEvent, MessageEvent, DirectoryEvent>> queue = new CopyOnWriteArrayList<>();
public static final EventBroadcast broadcaster = new EventBroadcast();

public EventSystem() {
super();
}

// -- Subscription & update with past events

public void addSubscriber(Subscriber newSubscriber, String lastEventId) {
try {
subscriptions.add(newSubscriber);
broadcaster.addSubscriber(newSubscriber.getClient());
sendEventMessage(newSubscriber, lastEventId);
}catch(Exception e) {
Directory.LOGGER.error(e.toString());
}
}

private void sendEventMessage(Subscriber subscriber, String lastEventId) {
if(lastEventId!=null) {
int index;
if(lastEventId.equals("*")) {
index = 0;
}else{
for(index=0; index < queue.size(); index++) {
boolean breakloop = queue.get(index).getValue0().getId().equals(lastEventId);
if(breakloop)
break;
}
}
sendPastEventMessages(subscriber, index);
}
}

private void sendPastEventMessages(Subscriber subscriber, int skip) {
queue.stream().skip(skip)
.filter(triplet -> subscriberInterested(subscriber,triplet.getValue2()))
.forEach(triplet -> sendEventMessage(subscriber, triplet.getValue0(), triplet.getValue1()));
}

// -- Sending new events

public void igniteEvent(String thingId, DirectoryEvent event) {
igniteEvent(thingId, event, null);
}

public void igniteEvent(String thingId, DirectoryEvent event, Thing thing) {
String id = Utils.buildMessage(thingId, "/events/", event.getEvent());
String data = Utils.buildMessage("{\"id\": \"", thingId, "\"}");
String extendedData = prepareExtendedMessage(thing);
MessageEvent mesasage = new MessageEvent.Builder().setId(id).setEvent(event.getEvent()).setData(data).build();
MessageEvent extendedMesasage = new MessageEvent.Builder().setId(id).setEvent(event.getEvent()).setData(extendedData).build();

subscriptions.parallelStream()
.filter(subscriber -> subscriberInterested(subscriber,event))
.forEach(subscriber -> sendEventMessage(subscriber, mesasage, extendedMesasage));
Triplet<MessageEvent, MessageEvent, DirectoryEvent> pair = new Triplet<>(mesasage, extendedMesasage, event);
queue.add(pair);
}

private void sendEventMessage(Subscriber subscriber, MessageEvent mesasage, MessageEvent extendedMesasage) {
try {
if(subscriber.getDiff()) {
subscriber.getClient().send(extendedMesasage);
}else {
subscriber.getClient().send(mesasage);
}
}catch(Exception e) {
Directory.LOGGER.error(e.toString());
}
}

private boolean subscriberInterested(Subscriber subscriber, DirectoryEvent event) {
return subscriber.getEventType().equals(DirectoryEvent.ALL) || subscriber.getEventType().equals(event);
}

private String prepareExtendedMessage(Thing thing) {
String extendedMsg = null;
try {
if (thing != null)
extendedMsg = thing.toJson().toString();
} catch (Exception e) {
Directory.LOGGER.error(e.toString());
}
return extendedMsg;
}




}
69 changes: 43 additions & 26 deletions src/main/java/directory/events/EventsController.java
Original file line number Diff line number Diff line change
@@ -1,46 +1,63 @@
package directory.events;

import directory.Utils;
import info.macias.sse.EventBroadcast;
import info.macias.sse.events.MessageEvent;
import info.macias.sse.servlet3.ServletEventTarget;
import spark.Request;
import spark.Response;
import spark.Route;

public class EventsController {
public class EventsController {

// -- Attributes
public static EventBroadcast broadcaster = new EventBroadcast();

// -- Constructor
public static final EventSystem eventSystem = new EventSystem();
private static final String EVENT_MIME_TYPE = "text/event-stream";

// -- Constructor
private EventsController() {
super();
}

// -- Methods
public static Route subscribe = (Request request, Response response) -> {

String eventType = request.queryParams("type");
String includeChanged = request.queryParams("include_changes");

response.header(Utils.HEADER_CONTENT_TYPE, "text/event-stream");
//response.header("Last-Event-ID", "[id of the event]");
response.status(200);
broadcaster.addSubscriber(new ServletEventTarget(request.raw()));
igniteCreateEvent(null);
// -- Routing methods

public static final Route subscribe = (Request request, Response response) -> {
instantiateSubscriber(request, DirectoryEvent.ALL);
prepareResponse(response);
return "";
};

public static final Route subscribeCreate = (Request request, Response response) -> {
instantiateSubscriber(request, DirectoryEvent.CREATE);
prepareResponse(response);
return "";
};

public static final Route subscribeUpdate = (Request request, Response response) -> {
instantiateSubscriber(request, DirectoryEvent.UPDATE);
prepareResponse(response);
return "";
};

public static final Route subscribeDelete = (Request request, Response response) -> {
instantiateSubscriber(request, DirectoryEvent.DELETE);
prepareResponse(response);
return "";
};

private static void prepareResponse(Response response) {
response.header(Utils.HEADER_CONTENT_TYPE, EVENT_MIME_TYPE);
response.status(200);
}

public static void igniteCreateEvent(String thingId) {
String msg = Utils.buildMessage("event: create\ndata: {\"id\": \"",thingId,"\"}\nc\n\n");

MessageEvent welcome = new MessageEvent.Builder()
.setId("id of the event")
.setEvent("create")
.setData("the message").build();
broadcaster.broadcast(welcome);
System.out.println("adding event");
private static final String DIFF_PARAMETER = "diff";
private static void instantiateSubscriber(Request request, DirectoryEvent eventType) {
Boolean diff = request.queryParams(DIFF_PARAMETER)!=null && request.queryParams(DIFF_PARAMETER).equals("true");
ServletEventTarget client = new ServletEventTarget(request.raw());
Subscriber newSubscriber = new Subscriber(client, eventType, diff);
String lastEventId = request.headers("Last-Event-ID");
eventSystem.addSubscriber(newSubscriber, lastEventId);
}



}
52 changes: 52 additions & 0 deletions src/main/java/directory/events/Subscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package directory.events;

import info.macias.sse.servlet3.ServletEventTarget;

public class Subscriber {

private ServletEventTarget client;
private DirectoryEvent eventType;
private Boolean diff;


public Subscriber(ServletEventTarget client, DirectoryEvent eventType, Boolean diff) {
super();
this.client = client;
this.eventType = eventType;
this.diff = diff;
}


public ServletEventTarget getClient() {
return client;
}


public void setClient(ServletEventTarget client) {
this.client = client;
}


public DirectoryEvent getEventType() {
return eventType;
}


public void setEventType(DirectoryEvent eventType) {
this.eventType = eventType;
}


public Boolean getDiff() {
return diff;
}


public void setDiff(Boolean diff) {
this.diff = diff;
}




}
14 changes: 10 additions & 4 deletions src/main/java/directory/things/ThingsDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import org.apache.jena.riot.RDFFormat;
import com.google.gson.JsonObject;

import directory.Utils;
import directory.events.DirectoryEvent;
import directory.events.EventsController;
import directory.exceptions.RemoteException;
import directory.triplestore.TriplestoreEndpoint;
Expand All @@ -20,7 +20,7 @@ public class ThingsDAO {
// -- Attributes

private static final String ASK_KEY = "boolean";
public static List<String> events = new CopyOnWriteArrayList<>();

// -- Constructor

private ThingsDAO() {
Expand All @@ -31,7 +31,7 @@ private ThingsDAO() {

// Create

public static Boolean create(Thing thing, String graphId) {
public static Boolean create(Thing thing, String graphId, Boolean exist) {
Boolean correct = false;
String query = null;
ThingsMapper.syntacticValidation(thing);
Expand All @@ -42,7 +42,12 @@ public static Boolean create(Thing thing, String graphId) {
if (messageResponse.length>0)
throw new RemoteException(new String(messageResponse));
correct = true;
EventsController.igniteCreateEvent(thing.getId());
if(exist) {
EventsController.eventSystem.igniteEvent(thing.getId(), DirectoryEvent.UPDATE, thing);
}else {
EventsController.eventSystem.igniteEvent(thing.getId(), DirectoryEvent.CREATE, thing);
}

return correct;
}

Expand Down Expand Up @@ -98,6 +103,7 @@ protected static boolean exist(String graphId) {
protected static void delete(String graphId) {
String query = Utils.buildMessage("DELETE { ?s ?p ?o } WHERE { GRAPH <",graphId,"> { ?s ?p ?o } }");
TriplestoreEndpoint.sendUpdateQuery(query);
EventsController.eventSystem.igniteEvent(graphId, DirectoryEvent.DELETE);
}


Expand Down
10 changes: 6 additions & 4 deletions src/main/java/directory/things/ThingsMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,12 @@ private static final Thing buildThing(RDFFormat format, String td) {
private static final List<Thing> buildThings(RDFFormat format, String td) {
List<Thing> things = new ArrayList<>();
try {
Model model = ModelFactory.createDefaultModel();
RDFReader reader = model.getReader(format.getLang().getName().toLowerCase());
reader.read(model, new ByteArrayInputStream(td.getBytes()), Directory.getConfiguration().getService().getDirectoryURIBase());
things = JTD.fromRDF(model);
if(td.length() > 0 && !td.equals("[]")) {
Model model = ModelFactory.createDefaultModel();
RDFReader reader = model.getReader(format.getLang().getName().toLowerCase());
reader.read(model, new ByteArrayInputStream(td.getBytes()), Directory.getConfiguration().getService().getDirectoryURIBase());
things = JTD.fromRDF(model);
}
} catch (SchemaValidationException | IllegalArgumentException e) {
throw new ThingParsingException(e.toString());
} catch (Exception e) {
Expand Down
Loading

0 comments on commit fd8a8ef

Please sign in to comment.