diff --git a/pom.xml b/pom.xml index 43622d9..6058e39 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,17 @@ jeromq 0.5.2 + + net.dv8tion + JDA + 4.1.1_143 + + + club.minnced + opus-java + + + @@ -145,4 +156,10 @@ + + + spring-repo + https://repo.spring.io/plugins-release/ + + diff --git a/src/main/java/fun/manuel/CarrierAnalyser.java b/src/main/java/fun/manuel/CarrierAnalyser.java index be3ef23..2e52e44 100644 --- a/src/main/java/fun/manuel/CarrierAnalyser.java +++ b/src/main/java/fun/manuel/CarrierAnalyser.java @@ -9,6 +9,8 @@ import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.annotations.Broadcast; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,51 +20,43 @@ import fun.manuel.data.CarrierAnalysesContext; import fun.manuel.data.EddnJournal; import fun.manuel.data.Message; import fun.manuel.data.Message.Event; +import fun.manuel.data.UpdateType; import fun.manuel.service.CarrierService; import io.agroal.api.AgroalDataSource; import io.quarkus.runtime.Startup; -@Startup + @ApplicationScoped public class CarrierAnalyser { @Inject CarrierService service; - - @Inject - @Broadcast - @Channel("carrier-context") Emitter contextEmitter; - @Inject - @Broadcast - @Channel("carrier-update1") Emitter updateEmitter1; - @Inject - @Broadcast - @Channel("carrier-update2") Emitter updateEmitter2; Logger logger = LoggerFactory.getLogger("Carrier Eventanalyser"); - @Incoming(value = "eddn-carrier") - public void toContext(EddnJournal journal) { + @Incoming("eddn-carrier") + @Broadcast + @Outgoing("carrier-context") + public CarrierAnalysesContext toContext(EddnJournal journal) { Event ev = journal.getMessage().getEvent(); Carrier freshCarrier = convertToCarrier(journal); Carrier storedCarrier = service.getOrNull(freshCarrier.getId()); - //logger.info("Carrier Event: " + freshCarrier.getId() + " | " + ev.toString()); - contextEmitter.send(new CarrierAnalysesContext(freshCarrier, storedCarrier, journal)); + System.out.println("Carrier Event: " + freshCarrier.getId() + " | " + ev.toString()); + return new CarrierAnalysesContext(freshCarrier, storedCarrier, journal); } @Incoming(value = "carrier-context") @Broadcast @Outgoing("carrier-update") - public CarrierAnalysesContext filterUpdate(CarrierAnalysesContext context) { + public Multi filterUpdate(CarrierAnalysesContext context) { //logger.info(context.toString()); if(context.getStoredCarrier()==null) { - //new carrier - updateEmitter1.send(context); - updateEmitter2.send(context); - return context; + //register + context.setUpdatetype(UpdateType.Register); + return Multi.createFrom().item(context); } if(!context.getFreshCarrier().equals(context.getStoredCarrier())) { @@ -70,25 +64,30 @@ public class CarrierAnalyser { Carrier stored=context.getStoredCarrier(); if(!fresh.getSystem().equals(stored.getSystem())) { //new system - updateEmitter1.send(context); - updateEmitter2.send(context); - return context; + context.setUpdatetype(UpdateType.Update); + return Multi.createFrom().item(context); } if(!(fresh.getBody().equals("")&&!stored.equals(""))) { //adding body, - updateEmitter1.send(context); - updateEmitter2.send(context); - return context; + context.setUpdatetype(UpdateType.UpdateBody); + return Multi.createFrom().item(context); } } - return null; + return Multi.createFrom().empty(); } - @Incoming("carrier-update1") - @Transactional + @Incoming("carrier-update") public void updateCarrier(CarrierAnalysesContext context) { Carrier tostore=service.getOrCreate(context.getFreshCarrier()); - tostore.setBody(context.getFreshCarrier().getBody()); - tostore.setSystem(context.getFreshCarrier().getSystem()); + switch (context.getUpdatetype()) { + case Update: + tostore.setBody(context.getFreshCarrier().getBody()); + tostore.setSystem(context.getFreshCarrier().getSystem()); + break; + case UpdateBody: + tostore.setBody(context.getFreshCarrier().getBody()); + default: + break; + } service.save(tostore); } diff --git a/src/main/java/fun/manuel/data/CarrierAnalysesContext.java b/src/main/java/fun/manuel/data/CarrierAnalysesContext.java index ece3c91..c4d85d2 100644 --- a/src/main/java/fun/manuel/data/CarrierAnalysesContext.java +++ b/src/main/java/fun/manuel/data/CarrierAnalysesContext.java @@ -4,6 +4,7 @@ public class CarrierAnalysesContext { private Carrier freshCarrier; private Carrier storedCarrier; private EddnJournal entry; + private UpdateType updatetype; public CarrierAnalysesContext(Carrier freshCarrier, Carrier storedCarrier, EddnJournal entry) { super(); @@ -11,6 +12,15 @@ public class CarrierAnalysesContext { this.storedCarrier = storedCarrier; this.entry = entry; } + + public UpdateType getUpdatetype() { + return updatetype; + } + + public void setUpdatetype(UpdateType updatetype) { + this.updatetype = updatetype; + } + public Carrier getFreshCarrier() { return freshCarrier; } @@ -29,10 +39,13 @@ public class CarrierAnalysesContext { public void setEntry(EddnJournal entry) { this.entry = entry; } + @Override public String toString() { return "CarrierAnalysesContext [freshCarrier=" + freshCarrier + ", storedCarrier=" + storedCarrier + ", entry=" - + entry + "]"; + + entry + ", updatetype=" + updatetype + "]"; } + + } diff --git a/src/main/java/fun/manuel/data/UpdateType.java b/src/main/java/fun/manuel/data/UpdateType.java new file mode 100644 index 0000000..8e6f2fa --- /dev/null +++ b/src/main/java/fun/manuel/data/UpdateType.java @@ -0,0 +1,5 @@ +package fun.manuel.data; + +public enum UpdateType { +Register,Update,UpdateBody +} diff --git a/src/main/java/fun/manuel/filter/CarrierFilter.java b/src/main/java/fun/manuel/filter/CarrierFilter.java index aaa1873..1c35fcb 100644 --- a/src/main/java/fun/manuel/filter/CarrierFilter.java +++ b/src/main/java/fun/manuel/filter/CarrierFilter.java @@ -1,39 +1,69 @@ package fun.manuel.filter; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Supplier; + import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; import fun.manuel.data.EddnJournal; -import fun.manuel.data.Message; import fun.manuel.data.Message.Event; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.annotations.Broadcast; @ApplicationScoped public class CarrierFilter { - - @Inject - @Channel(value = "eddn-carrier") - @Broadcast - Emitter filtered; - + private final BlockingQueue carrierQueue=new LinkedBlockingQueue(); + @Incoming(value = "eddn-full") + public void filterCarrierOnlyEvents(EddnJournal ju) { Event ev = ju.getMessage().getEvent(); - Message msg = ju.getMessage(); + fun.manuel.data.Message msg = ju.getMessage(); if (ev == Event.CARRIER_JUMP) { - filtered.send(ju); + carrierQueue.add(ju); + return; } if (msg.getAdditionalProperties().containsKey("StationType")) { if (msg.getAdditionalProperties().get("StationType").equals("FleetCarrier")) { - filtered.send(ju); + carrierQueue.add(ju); + return; } } + //return Uni.createFrom().nothing(); + + } + + @Outgoing("eddn-carrier") + public CompletionStage brodcastCarrierOnlyEvents() { + + + return CompletableFuture.supplyAsync(new Supplier() { + + @Override + public EddnJournal get() { + // TODO Auto-generated method stub + try { + return carrierQueue.take(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + }); + } } diff --git a/src/main/java/fun/manuel/handler/PrintToConsole.java b/src/main/java/fun/manuel/handler/PrintToConsole.java index fe6bab8..dff4c6a 100644 --- a/src/main/java/fun/manuel/handler/PrintToConsole.java +++ b/src/main/java/fun/manuel/handler/PrintToConsole.java @@ -10,7 +10,7 @@ import fun.manuel.data.Message.Event; @ApplicationScoped public class PrintToConsole { - @Incoming("carrier-update2") + @Incoming("carrier-update") public void printUpdate(CarrierAnalysesContext context) { System.out.println("Carrier Event: " + context.getFreshCarrier().getId() + " | " + context.getEntry().getMessage().getEvent().toString()); diff --git a/src/main/java/fun/manuel/service/CarrierService.java b/src/main/java/fun/manuel/service/CarrierService.java index 387b9b4..a45e344 100644 --- a/src/main/java/fun/manuel/service/CarrierService.java +++ b/src/main/java/fun/manuel/service/CarrierService.java @@ -13,8 +13,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import fun.manuel.data.Carrier; +import io.quarkus.runtime.Startup; - +@Startup(1) @ApplicationScoped public class CarrierService { @@ -59,7 +60,6 @@ public class CarrierService { public void save(Carrier car) { org.hibernate.Session session=em.unwrap(org.hibernate.Session.class); session.saveOrUpdate(car); - session.close(); em.flush(); } diff --git a/src/main/java/fun/manuel/service/DiscordBotService.java b/src/main/java/fun/manuel/service/DiscordBotService.java new file mode 100644 index 0000000..198b1e0 --- /dev/null +++ b/src/main/java/fun/manuel/service/DiscordBotService.java @@ -0,0 +1,49 @@ +package fun.manuel.service; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.ApplicationScoped; +import javax.security.auth.login.LoginException; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkus.runtime.Startup; +import net.dv8tion.jda.api.JDA; +import net.dv8tion.jda.api.JDABuilder; +import net.dv8tion.jda.api.entities.Message; +import net.dv8tion.jda.api.entities.MessageChannel; +import net.dv8tion.jda.api.events.GenericEvent; +import net.dv8tion.jda.api.events.message.MessageReceivedEvent; +import net.dv8tion.jda.api.hooks.EventListener; +@Startup(3) +@ApplicationScoped +public class DiscordBotService { + + @ConfigProperty(name = "bot.token") + private String bottoken; + private JDA jda; + @PostConstruct + private void startup() throws LoginException { + this.jda=JDABuilder.createDefault(bottoken).build(); + } + + EventListener pingpong=new EventListener() { + + @Override + public void onEvent(GenericEvent eventg) { + if(eventg instanceof MessageReceivedEvent) { + MessageReceivedEvent event=(MessageReceivedEvent) eventg; + if (event.getAuthor().isBot()) return; + // We don't want to respond to other bot accounts, including ourself + Message message = event.getMessage(); + String content = message.getContentRaw(); + // getContentRaw() is an atomic getter + // getContentDisplay() is a lazy getter which modifies the content for e.g. console view (strip discord formatting) + if (content.equals("!ping")) + { + MessageChannel channel = event.getChannel(); + channel.sendMessage("Pong!").queue(); // Important to call .queue() on the RestAction returned by sendMessage(...) + } + } + } + }; +} diff --git a/src/main/java/org/eddn/EddnPump.java b/src/main/java/org/eddn/EddnPump.java index 56d0170..e551967 100644 --- a/src/main/java/org/eddn/EddnPump.java +++ b/src/main/java/org/eddn/EddnPump.java @@ -2,6 +2,7 @@ package org.eddn; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.zeromq.ZContext; import org.zeromq.ZMQ; @@ -14,7 +15,9 @@ import fun.manuel.data.Carrier; import fun.manuel.data.EddnJournal; import fun.manuel.data.Message; import fun.manuel.data.Message.Event; +import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.Startup; +import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.annotations.Broadcast; import java.awt.AWTException; @@ -37,9 +40,11 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.DataFormatException; import java.util.zip.Inflater; @@ -54,29 +59,38 @@ import javax.sound.sampled.LineUnavailableException; * Subscribe to zmq relay from EDDN */ @ApplicationScoped -@Startup -public class EddnPump extends Thread { +@Startup(0) +public class EddnPump { public static final String SCHEMA_KEY = "$schemaRef"; public static final String RELAY = "tcp://eddn.edcd.io:9500"; public static final ArrayList list = new ArrayList(); public static final File dbfile = new File("stream.json"); public static FileWriter writer = null; - public static ExecutorService computeservice = Executors.newCachedThreadPool(); + public static ExecutorService computeservice = Executors.newSingleThreadExecutor(); public static HashMap carrierMap = new HashMap(); + public BlockingQueue journalqueue = new LinkedBlockingQueue(); + public TrayIcon trayIcon; public boolean init = true; - - @Inject - @Channel(value = "eddn-full") - @Broadcast - Emitter fulllog; + + @Outgoing("eddn-full") + public Uni publishJournal() { + try { + return Uni.createFrom().item(journalqueue.take()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + + } public void msg(String msg) { ObjectMapper mapper = new ObjectMapper(); try { EddnJournal journal = mapper.readValue(msg, EddnJournal.class); - fulllog.send(journal).toCompletableFuture().join(); + journalqueue.add(journal); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); @@ -89,51 +103,50 @@ public class EddnPump extends Thread { } } - @Override - public void run() { - pump(); - } + Runnable pump = new Runnable() { - public synchronized void pump() { - ZContext ctx = new ZContext(); - ZMQ.Socket client = ctx.createSocket(ZMQ.SUB); - client.subscribe("".getBytes()); - client.setReceiveTimeOut(30000); + @Override + public void run() { + // TODO Auto-generated method stub + ZContext ctx = new ZContext(); + ZMQ.Socket client = ctx.createSocket(ZMQ.SUB); + client.subscribe("".getBytes()); + client.setReceiveTimeOut(30000); - client.connect(RELAY); - System.out.println("EDDN Relay connected"); - ZMQ.Poller poller = ctx.createPoller(2); - poller.register(client, ZMQ.Poller.POLLIN); - byte[] output = new byte[256 * 1024]; - while (true) { - int poll = poller.poll(10); - if (poll == ZMQ.Poller.POLLIN) { - ZMQ.PollItem item = poller.getItem(poll); + client.connect(RELAY); + System.out.println("EDDN Relay connected"); + ZMQ.Poller poller = ctx.createPoller(2); + poller.register(client, ZMQ.Poller.POLLIN); + byte[] output = new byte[256 * 1024]; + while (true) { + int poll = poller.poll(10); + if (poll == ZMQ.Poller.POLLIN) { + ZMQ.PollItem item = poller.getItem(poll); - if (poller.pollin(0)) { - byte[] recv = client.recv(ZMQ.NOBLOCK); - if (recv.length > 0) { - // decompress - Inflater inflater = new Inflater(); - inflater.setInput(recv); - try { - int outlen = inflater.inflate(output); - String outputString = new String(output, 0, outlen, "UTF-8"); - // outputString contains a json message + if (poller.pollin(0)) { + byte[] recv = client.recv(ZMQ.NOBLOCK); + if (recv.length > 0) { + // decompress + Inflater inflater = new Inflater(); + inflater.setInput(recv); + try { + int outlen = inflater.inflate(output); + String outputString = new String(output, 0, outlen, "UTF-8"); + // outputString contains a json message - if (outputString.contains(SCHEMA_KEY)) { - msg(outputString); + if (outputString.contains(SCHEMA_KEY)) { + msg(outputString); + } + + } catch (DataFormatException | IOException e) { + e.printStackTrace(); } - - } catch (DataFormatException | IOException e) { - e.printStackTrace(); } } } } } - } - + }; public Runnable analyse(EddnJournal journal) { Runnable analyse = new Runnable() { @@ -174,49 +187,49 @@ public class EddnPump extends Thread { String body = (String) msg.getAdditionalProperties().getOrDefault("Body", ""); String system = msg.getStarSystem(); Carrier car = carrierMap.getOrDefault(name, new Carrier(system, body, name)); - String prevsystem=car.getSystem(); + String prevsystem = car.getSystem(); // Update car.setSystem(system); car.setBody(body); System.out.println("Carrier Event: " + car.getId() + " | " + ev.toString()); Carrier prev = carrierMap.get(name); - + if (ev == Event.CARRIER_JUMP) { System.out.println("\t" + prevsystem + " -> " + car.getSystem()); if (!init) { - //trayIcon.displayMessage("Carrier Jump", prev.getSystem() + " -> " + car.getSystem(), - // MessageType.INFO); + // trayIcon.displayMessage("Carrier Jump", prev.getSystem() + " -> " + + // car.getSystem(), + // MessageType.INFO); } } - if(car.getId().equals("V61-W9H")) { + if (car.getId().equals("V61-W9H")) { System.out.println("\tFCC Noms Event: " + journal.getMessage().toString()); - trayIcon.displayMessage("FCC Noms ALERT","FCC Noms event detected", - MessageType.INFO); + trayIcon.displayMessage("FCC Noms ALERT", "FCC Noms event detected", MessageType.INFO); } if (carrierMap.containsKey(name)) { - if(!car.getSystem().equals(prevsystem)) { + if (!car.getSystem().equals(prevsystem)) { System.out.println("\tUpdated: " + car.toString()); if (prev.getSystem().contains("COL 285 SECTOR XN-D A28-2")) { if (!init) { - trayIcon.displayMessage("Carrier TARGET ALERT","Carrier Jump to targeted system detected", - MessageType.INFO); + trayIcon.displayMessage("Carrier TARGET ALERT", + "Carrier Jump to targeted system detected", MessageType.INFO); } System.out.println("\tCarrier Jump from targeted system detected"); } if (car.getSystem().contains("COL 285 SECTOR XN-D A28-2")) { if (!init) { - trayIcon.displayMessage("Carrier TARGET ALERT","Carrier Jump to targeted system detected", - MessageType.INFO); + trayIcon.displayMessage("Carrier TARGET ALERT", + "Carrier Jump to targeted system detected", MessageType.INFO); } System.out.println("\tCarrier Jump to targeted system detected"); } if (car.getSystem().contains("COL 285 SECTOR CC-K A38-2")) { if (!init) { - trayIcon.displayMessage("Carrier TARGET ALERT","Carrier Jump to targeted system detected", - MessageType.INFO); + trayIcon.displayMessage("Carrier TARGET ALERT", + "Carrier Jump to targeted system detected", MessageType.INFO); } System.out.println("\tCarrier Jump to targeted system detected"); } @@ -312,13 +325,11 @@ public class EddnPump extends Thread { } return sortedMap; } - - @PostConstruct - private void startup() - throws IOException, InterruptedException, LineUnavailableException, AWTException { - ObjectMapper mapper = new ObjectMapper(); - start(); + @PostConstruct + private void startup() throws IOException, InterruptedException, LineUnavailableException, AWTException { + System.out.println("Scheduled Pump"); + computeservice.execute(pump); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7dc2d92..b693c0b 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,4 +2,5 @@ # key = value quarkus.datasource.db-kind=h2 quarkus.datasource.jdbc.url=jdbc:h2:~\\default -quarkus.hibernate-orm.database.generation=create \ No newline at end of file +quarkus.hibernate-orm.database.generation=none +bot.token=NzIyNDEyODk5Nzc5NDc3NTk0.Xuivdw.hiqHtxIG6jLXukYDJvhNYxW95P4 \ No newline at end of file