working streams

This commit is contained in:
2020-06-16 14:04:44 +02:00
parent ca70d5ce0a
commit 6e16bd5834
10 changed files with 235 additions and 110 deletions

17
pom.xml
View File

@@ -80,6 +80,17 @@
<artifactId>jeromq</artifactId>
<version>0.5.2</version>
</dependency>
<dependency>
<groupId>net.dv8tion</groupId>
<artifactId>JDA</artifactId>
<version>4.1.1_143</version>
<exclusions>
<exclusion>
<groupId>club.minnced</groupId>
<artifactId>opus-java</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
@@ -145,4 +156,10 @@
</properties>
</profile>
</profiles>
<repositories>
<repository>
<id>spring-repo</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
</project>

View File

@@ -9,6 +9,8 @@ import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,51 +20,43 @@ import fun.manuel.data.CarrierAnalysesContext;
import fun.manuel.data.EddnJournal;
import fun.manuel.data.Message;
import fun.manuel.data.Message.Event;
import fun.manuel.data.UpdateType;
import fun.manuel.service.CarrierService;
import io.agroal.api.AgroalDataSource;
import io.quarkus.runtime.Startup;
@Startup
@ApplicationScoped
public class CarrierAnalyser {
@Inject
CarrierService service;
@Inject
@Broadcast
@Channel("carrier-context") Emitter<CarrierAnalysesContext> contextEmitter;
@Inject
@Broadcast
@Channel("carrier-update1") Emitter<CarrierAnalysesContext> updateEmitter1;
@Inject
@Broadcast
@Channel("carrier-update2") Emitter<CarrierAnalysesContext> updateEmitter2;
Logger logger = LoggerFactory.getLogger("Carrier Eventanalyser");
@Incoming(value = "eddn-carrier")
public void toContext(EddnJournal journal) {
@Incoming("eddn-carrier")
@Broadcast
@Outgoing("carrier-context")
public CarrierAnalysesContext toContext(EddnJournal journal) {
Event ev = journal.getMessage().getEvent();
Carrier freshCarrier = convertToCarrier(journal);
Carrier storedCarrier = service.getOrNull(freshCarrier.getId());
//logger.info("Carrier Event: " + freshCarrier.getId() + " | " + ev.toString());
contextEmitter.send(new CarrierAnalysesContext(freshCarrier, storedCarrier, journal));
System.out.println("Carrier Event: " + freshCarrier.getId() + " | " + ev.toString());
return new CarrierAnalysesContext(freshCarrier, storedCarrier, journal);
}
@Incoming(value = "carrier-context")
@Broadcast
@Outgoing("carrier-update")
public CarrierAnalysesContext filterUpdate(CarrierAnalysesContext context) {
public Multi<CarrierAnalysesContext> filterUpdate(CarrierAnalysesContext context) {
//logger.info(context.toString());
if(context.getStoredCarrier()==null) {
//new carrier
updateEmitter1.send(context);
updateEmitter2.send(context);
return context;
//register
context.setUpdatetype(UpdateType.Register);
return Multi.createFrom().item(context);
}
if(!context.getFreshCarrier().equals(context.getStoredCarrier())) {
@@ -70,25 +64,30 @@ public class CarrierAnalyser {
Carrier stored=context.getStoredCarrier();
if(!fresh.getSystem().equals(stored.getSystem())) {
//new system
updateEmitter1.send(context);
updateEmitter2.send(context);
return context;
context.setUpdatetype(UpdateType.Update);
return Multi.createFrom().item(context);
}
if(!(fresh.getBody().equals("")&&!stored.equals(""))) {
//adding body,
updateEmitter1.send(context);
updateEmitter2.send(context);
return context;
context.setUpdatetype(UpdateType.UpdateBody);
return Multi.createFrom().item(context);
}
}
return null;
return Multi.createFrom().empty();
}
@Incoming("carrier-update1")
@Transactional
@Incoming("carrier-update")
public void updateCarrier(CarrierAnalysesContext context) {
Carrier tostore=service.getOrCreate(context.getFreshCarrier());
tostore.setBody(context.getFreshCarrier().getBody());
tostore.setSystem(context.getFreshCarrier().getSystem());
switch (context.getUpdatetype()) {
case Update:
tostore.setBody(context.getFreshCarrier().getBody());
tostore.setSystem(context.getFreshCarrier().getSystem());
break;
case UpdateBody:
tostore.setBody(context.getFreshCarrier().getBody());
default:
break;
}
service.save(tostore);
}

View File

@@ -4,6 +4,7 @@ public class CarrierAnalysesContext {
private Carrier freshCarrier;
private Carrier storedCarrier;
private EddnJournal entry;
private UpdateType updatetype;
public CarrierAnalysesContext(Carrier freshCarrier, Carrier storedCarrier, EddnJournal entry) {
super();
@@ -11,6 +12,15 @@ public class CarrierAnalysesContext {
this.storedCarrier = storedCarrier;
this.entry = entry;
}
public UpdateType getUpdatetype() {
return updatetype;
}
public void setUpdatetype(UpdateType updatetype) {
this.updatetype = updatetype;
}
public Carrier getFreshCarrier() {
return freshCarrier;
}
@@ -29,10 +39,13 @@ public class CarrierAnalysesContext {
public void setEntry(EddnJournal entry) {
this.entry = entry;
}
@Override
public String toString() {
return "CarrierAnalysesContext [freshCarrier=" + freshCarrier + ", storedCarrier=" + storedCarrier + ", entry="
+ entry + "]";
+ entry + ", updatetype=" + updatetype + "]";
}
}

View File

@@ -0,0 +1,5 @@
package fun.manuel.data;
public enum UpdateType {
Register,Update,UpdateBody
}

View File

@@ -1,39 +1,69 @@
package fun.manuel.filter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import fun.manuel.data.EddnJournal;
import fun.manuel.data.Message;
import fun.manuel.data.Message.Event;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Broadcast;
@ApplicationScoped
public class CarrierFilter {
@Inject
@Channel(value = "eddn-carrier")
@Broadcast
Emitter<EddnJournal> filtered;
private final BlockingQueue<EddnJournal> carrierQueue=new LinkedBlockingQueue<EddnJournal>();
@Incoming(value = "eddn-full")
public void filterCarrierOnlyEvents(EddnJournal ju) {
Event ev = ju.getMessage().getEvent();
Message msg = ju.getMessage();
fun.manuel.data.Message msg = ju.getMessage();
if (ev == Event.CARRIER_JUMP) {
filtered.send(ju);
carrierQueue.add(ju);
return;
}
if (msg.getAdditionalProperties().containsKey("StationType")) {
if (msg.getAdditionalProperties().get("StationType").equals("FleetCarrier")) {
filtered.send(ju);
carrierQueue.add(ju);
return;
}
}
//return Uni.createFrom().nothing();
}
@Outgoing("eddn-carrier")
public CompletionStage<EddnJournal> brodcastCarrierOnlyEvents() {
return CompletableFuture.supplyAsync(new Supplier<EddnJournal>() {
@Override
public EddnJournal get() {
// TODO Auto-generated method stub
try {
return carrierQueue.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
});
}
}

View File

@@ -10,7 +10,7 @@ import fun.manuel.data.Message.Event;
@ApplicationScoped
public class PrintToConsole {
@Incoming("carrier-update2")
@Incoming("carrier-update")
public void printUpdate(CarrierAnalysesContext context) {
System.out.println("Carrier Event: " + context.getFreshCarrier().getId() + " | " + context.getEntry().getMessage().getEvent().toString());

View File

@@ -13,8 +13,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import fun.manuel.data.Carrier;
import io.quarkus.runtime.Startup;
@Startup(1)
@ApplicationScoped
public class CarrierService {
@@ -59,7 +60,6 @@ public class CarrierService {
public void save(Carrier car) {
org.hibernate.Session session=em.unwrap(org.hibernate.Session.class);
session.saveOrUpdate(car);
session.close();
em.flush();
}

View File

@@ -0,0 +1,49 @@
package fun.manuel.service;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.security.auth.login.LoginException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.quarkus.runtime.Startup;
import net.dv8tion.jda.api.JDA;
import net.dv8tion.jda.api.JDABuilder;
import net.dv8tion.jda.api.entities.Message;
import net.dv8tion.jda.api.entities.MessageChannel;
import net.dv8tion.jda.api.events.GenericEvent;
import net.dv8tion.jda.api.events.message.MessageReceivedEvent;
import net.dv8tion.jda.api.hooks.EventListener;
@Startup(3)
@ApplicationScoped
public class DiscordBotService {
@ConfigProperty(name = "bot.token")
private String bottoken;
private JDA jda;
@PostConstruct
private void startup() throws LoginException {
this.jda=JDABuilder.createDefault(bottoken).build();
}
EventListener pingpong=new EventListener() {
@Override
public void onEvent(GenericEvent eventg) {
if(eventg instanceof MessageReceivedEvent) {
MessageReceivedEvent event=(MessageReceivedEvent) eventg;
if (event.getAuthor().isBot()) return;
// We don't want to respond to other bot accounts, including ourself
Message message = event.getMessage();
String content = message.getContentRaw();
// getContentRaw() is an atomic getter
// getContentDisplay() is a lazy getter which modifies the content for e.g. console view (strip discord formatting)
if (content.equals("!ping"))
{
MessageChannel channel = event.getChannel();
channel.sendMessage("Pong!").queue(); // Important to call .queue() on the RestAction returned by sendMessage(...)
}
}
}
};
}

View File

@@ -2,6 +2,7 @@ package org.eddn;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
@@ -14,7 +15,9 @@ import fun.manuel.data.Carrier;
import fun.manuel.data.EddnJournal;
import fun.manuel.data.Message;
import fun.manuel.data.Message.Event;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import java.awt.AWTException;
@@ -37,9 +40,11 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
@@ -54,29 +59,38 @@ import javax.sound.sampled.LineUnavailableException;
* Subscribe to zmq relay from EDDN
*/
@ApplicationScoped
@Startup
public class EddnPump extends Thread {
@Startup(0)
public class EddnPump {
public static final String SCHEMA_KEY = "$schemaRef";
public static final String RELAY = "tcp://eddn.edcd.io:9500";
public static final ArrayList<EddnJournal> list = new ArrayList<EddnJournal>();
public static final File dbfile = new File("stream.json");
public static FileWriter writer = null;
public static ExecutorService computeservice = Executors.newCachedThreadPool();
public static ExecutorService computeservice = Executors.newSingleThreadExecutor();
public static HashMap<String, Carrier> carrierMap = new HashMap<String, Carrier>();
public BlockingQueue<EddnJournal> journalqueue = new LinkedBlockingQueue<EddnJournal>();
public TrayIcon trayIcon;
public boolean init = true;
@Inject
@Channel(value = "eddn-full")
@Broadcast
Emitter<EddnJournal> fulllog;
@Outgoing("eddn-full")
public Uni<EddnJournal> publishJournal() {
try {
return Uni.createFrom().item(journalqueue.take());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
public void msg(String msg) {
ObjectMapper mapper = new ObjectMapper();
try {
EddnJournal journal = mapper.readValue(msg, EddnJournal.class);
fulllog.send(journal).toCompletableFuture().join();
journalqueue.add(journal);
} catch (JsonMappingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -89,51 +103,50 @@ public class EddnPump extends Thread {
}
}
@Override
public void run() {
pump();
}
Runnable pump = new Runnable() {
public synchronized void pump() {
ZContext ctx = new ZContext();
ZMQ.Socket client = ctx.createSocket(ZMQ.SUB);
client.subscribe("".getBytes());
client.setReceiveTimeOut(30000);
@Override
public void run() {
// TODO Auto-generated method stub
ZContext ctx = new ZContext();
ZMQ.Socket client = ctx.createSocket(ZMQ.SUB);
client.subscribe("".getBytes());
client.setReceiveTimeOut(30000);
client.connect(RELAY);
System.out.println("EDDN Relay connected");
ZMQ.Poller poller = ctx.createPoller(2);
poller.register(client, ZMQ.Poller.POLLIN);
byte[] output = new byte[256 * 1024];
while (true) {
int poll = poller.poll(10);
if (poll == ZMQ.Poller.POLLIN) {
ZMQ.PollItem item = poller.getItem(poll);
client.connect(RELAY);
System.out.println("EDDN Relay connected");
ZMQ.Poller poller = ctx.createPoller(2);
poller.register(client, ZMQ.Poller.POLLIN);
byte[] output = new byte[256 * 1024];
while (true) {
int poll = poller.poll(10);
if (poll == ZMQ.Poller.POLLIN) {
ZMQ.PollItem item = poller.getItem(poll);
if (poller.pollin(0)) {
byte[] recv = client.recv(ZMQ.NOBLOCK);
if (recv.length > 0) {
// decompress
Inflater inflater = new Inflater();
inflater.setInput(recv);
try {
int outlen = inflater.inflate(output);
String outputString = new String(output, 0, outlen, "UTF-8");
// outputString contains a json message
if (poller.pollin(0)) {
byte[] recv = client.recv(ZMQ.NOBLOCK);
if (recv.length > 0) {
// decompress
Inflater inflater = new Inflater();
inflater.setInput(recv);
try {
int outlen = inflater.inflate(output);
String outputString = new String(output, 0, outlen, "UTF-8");
// outputString contains a json message
if (outputString.contains(SCHEMA_KEY)) {
msg(outputString);
if (outputString.contains(SCHEMA_KEY)) {
msg(outputString);
}
} catch (DataFormatException | IOException e) {
e.printStackTrace();
}
} catch (DataFormatException | IOException e) {
e.printStackTrace();
}
}
}
}
}
}
};
public Runnable analyse(EddnJournal journal) {
Runnable analyse = new Runnable() {
@@ -174,49 +187,49 @@ public class EddnPump extends Thread {
String body = (String) msg.getAdditionalProperties().getOrDefault("Body", "");
String system = msg.getStarSystem();
Carrier car = carrierMap.getOrDefault(name, new Carrier(system, body, name));
String prevsystem=car.getSystem();
String prevsystem = car.getSystem();
// Update
car.setSystem(system);
car.setBody(body);
System.out.println("Carrier Event: " + car.getId() + " | " + ev.toString());
Carrier prev = carrierMap.get(name);
if (ev == Event.CARRIER_JUMP) {
System.out.println("\t" + prevsystem + " -> " + car.getSystem());
if (!init) {
//trayIcon.displayMessage("Carrier Jump", prev.getSystem() + " -> " + car.getSystem(),
// MessageType.INFO);
// trayIcon.displayMessage("Carrier Jump", prev.getSystem() + " -> " +
// car.getSystem(),
// MessageType.INFO);
}
}
if(car.getId().equals("V61-W9H")) {
if (car.getId().equals("V61-W9H")) {
System.out.println("\tFCC Noms Event: " + journal.getMessage().toString());
trayIcon.displayMessage("FCC Noms ALERT","FCC Noms event detected",
MessageType.INFO);
trayIcon.displayMessage("FCC Noms ALERT", "FCC Noms event detected", MessageType.INFO);
}
if (carrierMap.containsKey(name)) {
if(!car.getSystem().equals(prevsystem)) {
if (!car.getSystem().equals(prevsystem)) {
System.out.println("\tUpdated: " + car.toString());
if (prev.getSystem().contains("COL 285 SECTOR XN-D A28-2")) {
if (!init) {
trayIcon.displayMessage("Carrier TARGET ALERT","Carrier Jump to targeted system detected",
MessageType.INFO);
trayIcon.displayMessage("Carrier TARGET ALERT",
"Carrier Jump to targeted system detected", MessageType.INFO);
}
System.out.println("\tCarrier Jump from targeted system detected");
}
if (car.getSystem().contains("COL 285 SECTOR XN-D A28-2")) {
if (!init) {
trayIcon.displayMessage("Carrier TARGET ALERT","Carrier Jump to targeted system detected",
MessageType.INFO);
trayIcon.displayMessage("Carrier TARGET ALERT",
"Carrier Jump to targeted system detected", MessageType.INFO);
}
System.out.println("\tCarrier Jump to targeted system detected");
}
if (car.getSystem().contains("COL 285 SECTOR CC-K A38-2")) {
if (!init) {
trayIcon.displayMessage("Carrier TARGET ALERT","Carrier Jump to targeted system detected",
MessageType.INFO);
trayIcon.displayMessage("Carrier TARGET ALERT",
"Carrier Jump to targeted system detected", MessageType.INFO);
}
System.out.println("\tCarrier Jump to targeted system detected");
}
@@ -312,13 +325,11 @@ public class EddnPump extends Thread {
}
return sortedMap;
}
@PostConstruct
private void startup()
throws IOException, InterruptedException, LineUnavailableException, AWTException {
ObjectMapper mapper = new ObjectMapper();
start();
@PostConstruct
private void startup() throws IOException, InterruptedException, LineUnavailableException, AWTException {
System.out.println("Scheduled Pump");
computeservice.execute(pump);
}
}

View File

@@ -2,4 +2,5 @@
# key = value
quarkus.datasource.db-kind=h2
quarkus.datasource.jdbc.url=jdbc:h2:~\\default
quarkus.hibernate-orm.database.generation=create
quarkus.hibernate-orm.database.generation=none
bot.token=NzIyNDEyODk5Nzc5NDc3NTk0.Xuivdw.hiqHtxIG6jLXukYDJvhNYxW95P4