diff --git a/src/main/java/nl/wissehes/javatrain/DataReceiver.java b/src/main/java/nl/wissehes/javatrain/DataReceiver.java index 1424787..1254114 100644 --- a/src/main/java/nl/wissehes/javatrain/DataReceiver.java +++ b/src/main/java/nl/wissehes/javatrain/DataReceiver.java @@ -2,6 +2,7 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import nl.wissehes.javatrain.model.SiriMessage; import nl.wissehes.javatrain.util.GZipUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,7 +15,8 @@ @Component public class DataReceiver { - private final ZMQ.Socket subscriber; + private final ZMQ.Socket infoplusSubscriber; + private final ZMQ.Socket siriSubscriber; private final DataStore dataStore; Logger logger = LoggerFactory.getLogger(DataReceiver.class); @@ -22,8 +24,9 @@ public class DataReceiver { @Value("${infoplus.log-messages:false}") private boolean shouldLogMessages; - public DataReceiver(ZMQ.Socket subscriber, DataStore dataStore) { - this.subscriber = subscriber; + public DataReceiver(ZmqConfig config, DataStore dataStore) { + this.infoplusSubscriber = config.infoplusSubscriber(); + this.siriSubscriber = config.siriSubscriber(); this.dataStore = dataStore; } @@ -31,11 +34,23 @@ public DataReceiver(ZMQ.Socket subscriber, DataStore dataStore) { public void startListening() { new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { - String topic = subscriber.recvStr(); // Receive topic - byte[] messageBytes = subscriber.recv(); // Receive the compressed message as bytes + String topic = infoplusSubscriber.recvStr(); // Receive topic + byte[] messageBytes = infoplusSubscriber.recv(); // Receive the compressed message as bytes this.handleMessage(topic, messageBytes); } }).start(); + + new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + String topic = siriSubscriber.recvStr(); // Receive topic + byte[] messageBytes = siriSubscriber.recv(); // Receive the compressed message as bytes + if(!topic.startsWith("/HTM") && !topic.startsWith("/GVB")) { + System.out.println("Topic: " + topic); + System.out.println("Message: " + messageBytes.length); + this.handleMessage(topic, messageBytes); + } + } + }).start(); } private void handleMessage(String topic, byte[] messageBytes) { @@ -50,6 +65,7 @@ private void handleMessage(String topic, byte[] messageBytes) { case ZmqConfig.DVS_TOPIC -> dataStore.addDeparture(message); case ZmqConfig.RIT_TOPIC -> dataStore.addJourney(message); case ZmqConfig.POS_TOPIC -> dataStore.addPosition(message); + default -> dataStore.addRawSiriMessage(new SiriMessage(topic, message)); } } catch (IOException e) { logger.error("Failed to decompress message: {}", e.getMessage()); @@ -58,6 +74,6 @@ private void handleMessage(String topic, byte[] messageBytes) { @PreDestroy public void stopListening() { - subscriber.close(); // Clean up resources + infoplusSubscriber.close(); // Clean up resources } } diff --git a/src/main/java/nl/wissehes/javatrain/DataStore.java b/src/main/java/nl/wissehes/javatrain/DataStore.java index 05fcb72..e1359c6 100644 --- a/src/main/java/nl/wissehes/javatrain/DataStore.java +++ b/src/main/java/nl/wissehes/javatrain/DataStore.java @@ -5,6 +5,7 @@ import nl.wissehes.javatrain.mapper.PositionsMapper; import nl.wissehes.javatrain.model.NDOV.DVS.DepartureDocument; import nl.wissehes.javatrain.model.NDOV.RIT.JourneyDocument; +import nl.wissehes.javatrain.model.SiriMessage; import nl.wissehes.javatrain.model.departure.Departure; import nl.wissehes.javatrain.model.departure.TrainStatus; import nl.wissehes.javatrain.model.journey.Journey; @@ -35,6 +36,8 @@ public final class DataStore { private final Map stations = new HashMap<>(); + private final List rawSiriMessages = new LinkedList<>(); + private DataStore() { } @@ -87,6 +90,13 @@ public void addPosition(String message) { } } + /** + * Add a raw SIRI message to the data store + */ + public void addRawSiriMessage(SiriMessage message) { + rawSiriMessages.add(message); + } + /** * Get the departures */ @@ -145,6 +155,10 @@ public List getRawPositions() { return rawPositions; } + public List getRawSiriMessages() { + return rawSiriMessages; + } + /** * Get all found stations */ diff --git a/src/main/java/nl/wissehes/javatrain/ZmqConfig.java b/src/main/java/nl/wissehes/javatrain/ZmqConfig.java index 1a7b9f2..b183486 100644 --- a/src/main/java/nl/wissehes/javatrain/ZmqConfig.java +++ b/src/main/java/nl/wissehes/javatrain/ZmqConfig.java @@ -3,16 +3,19 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Service; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; -@Configuration +@Service public class ZmqConfig { @Value("${infoplus.endpoint}") private String ZMQ_SUBSCRIBER_ADDRESS; + private String SIRI_ADDRESS = "tcp://pubsub.besteffort.ndovloket.nl:7666"; + /** Departures */ public static final String DVS_TOPIC = "/RIG/InfoPlusDVSInterface4"; /** Arrivals */ @@ -22,8 +25,8 @@ public class ZmqConfig { /** Positions */ public static final String POS_TOPIC = "/RIG/NStreinpositiesInterface5"; - @Bean(destroyMethod = "close") - public ZMQ.Socket zmqSubscriber() { +// @Bean(destroyMethod = "close") + public ZMQ.Socket infoplusSubscriber() { ZContext context = new ZContext(); ZMQ.Socket subscriber = context.createSocket(SocketType.SUB); subscriber.connect(ZMQ_SUBSCRIBER_ADDRESS); @@ -34,4 +37,14 @@ public ZMQ.Socket zmqSubscriber() { return subscriber; } + + public ZMQ.Socket siriSubscriber() { + ZContext context = new ZContext(); + ZMQ.Socket subscriber = context.createSocket(SocketType.SUB); + subscriber.connect(SIRI_ADDRESS); + + subscriber.subscribe("/".getBytes(ZMQ.CHARSET)); + + return subscriber; + } } diff --git a/src/main/java/nl/wissehes/javatrain/controller/SiriController.java b/src/main/java/nl/wissehes/javatrain/controller/SiriController.java new file mode 100644 index 0000000..15bc945 --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/controller/SiriController.java @@ -0,0 +1,38 @@ +package nl.wissehes.javatrain.controller; + +import nl.wissehes.javatrain.DataStore; +import nl.wissehes.javatrain.model.SiriMessage; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController +@RequestMapping("/siri") +public class SiriController { + + private final DataStore dataStore; + + public SiriController(DataStore dataStore) { + this.dataStore = dataStore; + } + + @GetMapping(path = "/messages") + public List getMessages(@RequestParam(required = false) String topicFilter) { + + if(topicFilter != null) { + return dataStore.getRawSiriMessages().stream() + .filter(message -> message.topic().toLowerCase().contains(topicFilter.toLowerCase())) + .toList(); + } + + return dataStore.getRawSiriMessages(); + } + + public SiriMessage getMessage(int index) { + return dataStore.getRawSiriMessages().get(index); + } + +} diff --git a/src/main/java/nl/wissehes/javatrain/model/SIRI/SiriMessageDocument.java b/src/main/java/nl/wissehes/javatrain/model/SIRI/SiriMessageDocument.java new file mode 100644 index 0000000..6a394ad --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/model/SIRI/SiriMessageDocument.java @@ -0,0 +1,19 @@ +package nl.wissehes.javatrain.model.SIRI; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import nl.wissehes.javatrain.model.SIRI.heartbeat.HeartbeatNotification; +import nl.wissehes.javatrain.model.SIRI.service.ServiceDelivery; + +@JsonIgnoreProperties(ignoreUnknown = true) +@JacksonXmlRootElement +public class SiriMessageDocument { + + @JacksonXmlProperty(localName = "ServiceDelivery") + public ServiceDelivery serviceDelivery; + + @JacksonXmlProperty(localName = "HeartbeatNotification") + public HeartbeatNotification heartbeatNotification; + +} diff --git a/src/main/java/nl/wissehes/javatrain/model/SIRI/heartbeat/HeartbeatNotification.java b/src/main/java/nl/wissehes/javatrain/model/SIRI/heartbeat/HeartbeatNotification.java new file mode 100644 index 0000000..d154075 --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/model/SIRI/heartbeat/HeartbeatNotification.java @@ -0,0 +1,30 @@ +package nl.wissehes.javatrain.model.SIRI.heartbeat; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + +import java.time.OffsetDateTime; + +/* + 2025-05-01T17:04:30+02:00 + DOVA + true + 2025-05-01T16:09:49+02:00 + */ + +@JsonIgnoreProperties(ignoreUnknown = true) +public class HeartbeatNotification { + + @JacksonXmlProperty(localName = "RequestTimestamp") + public OffsetDateTime requestTimestamp; + + @JacksonXmlProperty(localName = "ProducerRef") + public String producerRef; + + @JacksonXmlProperty(localName = "Status") + public boolean status; + + @JacksonXmlProperty(localName = "ServiceStartedTime") + public OffsetDateTime serviceStartedTime; + +} diff --git a/src/main/java/nl/wissehes/javatrain/model/SIRI/package-info.java b/src/main/java/nl/wissehes/javatrain/model/SIRI/package-info.java new file mode 100644 index 0000000..5e86d3a --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/model/SIRI/package-info.java @@ -0,0 +1,4 @@ +/** + * Contains the necessary models for the SIRI messages. + */ +package nl.wissehes.javatrain.model.SIRI; \ No newline at end of file diff --git a/src/main/java/nl/wissehes/javatrain/model/SIRI/service/FacilityCondition.java b/src/main/java/nl/wissehes/javatrain/model/SIRI/service/FacilityCondition.java new file mode 100644 index 0000000..9b6d384 --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/model/SIRI/service/FacilityCondition.java @@ -0,0 +1,29 @@ +package nl.wissehes.javatrain.model.SIRI.service; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + +import java.time.OffsetDateTime; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class FacilityCondition { + + @JacksonXmlProperty(localName = "FacilityRef") + public String facilityRef; + + @JacksonXmlProperty(localName = "FacilityStatus") + public FacilityStatus facilityStatus; + + @JacksonXmlProperty(localName = "ValidityPeriod") + public ValidityPeriod validityPeriod; + + @JsonIgnoreProperties(ignoreUnknown = true) + public record FacilityStatus( + @JacksonXmlProperty(localName = "Status") String status + ) { } + + @JsonIgnoreProperties(ignoreUnknown = true) + public record ValidityPeriod( + @JacksonXmlProperty(localName = "StartTime") OffsetDateTime startTime + ) { } +} diff --git a/src/main/java/nl/wissehes/javatrain/model/SIRI/service/FacilityMonitoringDelivery.java b/src/main/java/nl/wissehes/javatrain/model/SIRI/service/FacilityMonitoringDelivery.java new file mode 100644 index 0000000..9cebf59 --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/model/SIRI/service/FacilityMonitoringDelivery.java @@ -0,0 +1,20 @@ +package nl.wissehes.javatrain.model.SIRI.service; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + +import java.time.LocalDateTime; +import java.util.List; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class FacilityMonitoringDelivery { + + @JacksonXmlProperty(localName = "ResponseTimestamp") + public LocalDateTime timestamp; + + @JacksonXmlProperty(localName = "FacilityCondition") + @JacksonXmlElementWrapper(useWrapping = false) + public List facilityCondition; + +} diff --git a/src/main/java/nl/wissehes/javatrain/model/SIRI/service/ServiceDelivery.java b/src/main/java/nl/wissehes/javatrain/model/SIRI/service/ServiceDelivery.java new file mode 100644 index 0000000..64ad21a --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/model/SIRI/service/ServiceDelivery.java @@ -0,0 +1,20 @@ +package nl.wissehes.javatrain.model.SIRI.service; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + +import java.time.OffsetDateTime; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class ServiceDelivery { + + @JacksonXmlProperty(localName = "ResponseTimestamp") + public OffsetDateTime timestamp; + + @JacksonXmlProperty(localName = "ProducerRef") + public String producerRef; + + @JacksonXmlProperty(localName = "FacilityMonitoringDelivery") + public FacilityMonitoringDelivery facilityMonitoringDelivery; + +} diff --git a/src/main/java/nl/wissehes/javatrain/model/SiriMessage.java b/src/main/java/nl/wissehes/javatrain/model/SiriMessage.java new file mode 100644 index 0000000..3df21b6 --- /dev/null +++ b/src/main/java/nl/wissehes/javatrain/model/SiriMessage.java @@ -0,0 +1,7 @@ +package nl.wissehes.javatrain.model; + +public record SiriMessage( + String topic, + String message +) { +}