Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions src/main/java/nl/wissehes/javatrain/DataReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import nl.wissehes.javatrain.model.SiriMessage;
import nl.wissehes.javatrain.util.GZipUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,28 +15,42 @@
@Component
public class DataReceiver {

private final ZMQ.Socket subscriber;
private final ZMQ.Socket infoplusSubscriber;
private final ZMQ.Socket siriSubscriber;
private final DataStore dataStore;

Logger logger = LoggerFactory.getLogger(DataReceiver.class);

@Value("${infoplus.log-messages:false}")
private boolean shouldLogMessages;

public DataReceiver(ZMQ.Socket subscriber, DataStore dataStore) {
this.subscriber = subscriber;
public DataReceiver(ZmqConfig config, DataStore dataStore) {
this.infoplusSubscriber = config.infoplusSubscriber();
this.siriSubscriber = config.siriSubscriber();
this.dataStore = dataStore;
}

@PostConstruct
public void startListening() {
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
String topic = subscriber.recvStr(); // Receive topic
byte[] messageBytes = subscriber.recv(); // Receive the compressed message as bytes
String topic = infoplusSubscriber.recvStr(); // Receive topic
byte[] messageBytes = infoplusSubscriber.recv(); // Receive the compressed message as bytes
this.handleMessage(topic, messageBytes);
}
}).start();

new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
String topic = siriSubscriber.recvStr(); // Receive topic
byte[] messageBytes = siriSubscriber.recv(); // Receive the compressed message as bytes
if(!topic.startsWith("/HTM") && !topic.startsWith("/GVB")) {
System.out.println("Topic: " + topic);
System.out.println("Message: " + messageBytes.length);
this.handleMessage(topic, messageBytes);
}
}
}).start();
}

private void handleMessage(String topic, byte[] messageBytes) {
Expand All @@ -50,6 +65,7 @@ private void handleMessage(String topic, byte[] messageBytes) {
case ZmqConfig.DVS_TOPIC -> dataStore.addDeparture(message);
case ZmqConfig.RIT_TOPIC -> dataStore.addJourney(message);
case ZmqConfig.POS_TOPIC -> dataStore.addPosition(message);
default -> dataStore.addRawSiriMessage(new SiriMessage(topic, message));
}
} catch (IOException e) {
logger.error("Failed to decompress message: {}", e.getMessage());
Expand All @@ -58,6 +74,6 @@ private void handleMessage(String topic, byte[] messageBytes) {

@PreDestroy
public void stopListening() {
subscriber.close(); // Clean up resources
infoplusSubscriber.close(); // Clean up resources
}
}
14 changes: 14 additions & 0 deletions src/main/java/nl/wissehes/javatrain/DataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import nl.wissehes.javatrain.mapper.PositionsMapper;
import nl.wissehes.javatrain.model.NDOV.DVS.DepartureDocument;
import nl.wissehes.javatrain.model.NDOV.RIT.JourneyDocument;
import nl.wissehes.javatrain.model.SiriMessage;
import nl.wissehes.javatrain.model.departure.Departure;
import nl.wissehes.javatrain.model.departure.TrainStatus;
import nl.wissehes.javatrain.model.journey.Journey;
Expand Down Expand Up @@ -35,6 +36,8 @@ public final class DataStore {

private final Map<String, Station> stations = new HashMap<>();

private final List<SiriMessage> rawSiriMessages = new LinkedList<>();

private DataStore() {
}

Expand Down Expand Up @@ -87,6 +90,13 @@ public void addPosition(String message) {
}
}

/**
* Add a raw SIRI message to the data store
*/
public void addRawSiriMessage(SiriMessage message) {
rawSiriMessages.add(message);
}

/**
* Get the departures
*/
Expand Down Expand Up @@ -145,6 +155,10 @@ public List<String> getRawPositions() {
return rawPositions;
}

public List<SiriMessage> getRawSiriMessages() {
return rawSiriMessages;
}

/**
* Get all found stations
*/
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/nl/wissehes/javatrain/ZmqConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

@Configuration
@Service
public class ZmqConfig {

@Value("${infoplus.endpoint}")
private String ZMQ_SUBSCRIBER_ADDRESS;

private String SIRI_ADDRESS = "tcp://pubsub.besteffort.ndovloket.nl:7666";

/** Departures */
public static final String DVS_TOPIC = "/RIG/InfoPlusDVSInterface4";
/** Arrivals */
Expand All @@ -22,8 +25,8 @@ public class ZmqConfig {
/** Positions */
public static final String POS_TOPIC = "/RIG/NStreinpositiesInterface5";

@Bean(destroyMethod = "close")
public ZMQ.Socket zmqSubscriber() {
// @Bean(destroyMethod = "close")
public ZMQ.Socket infoplusSubscriber() {
ZContext context = new ZContext();
ZMQ.Socket subscriber = context.createSocket(SocketType.SUB);
subscriber.connect(ZMQ_SUBSCRIBER_ADDRESS);
Expand All @@ -34,4 +37,14 @@ public ZMQ.Socket zmqSubscriber() {

return subscriber;
}

public ZMQ.Socket siriSubscriber() {
ZContext context = new ZContext();
ZMQ.Socket subscriber = context.createSocket(SocketType.SUB);
subscriber.connect(SIRI_ADDRESS);

subscriber.subscribe("/".getBytes(ZMQ.CHARSET));

return subscriber;
}
}
38 changes: 38 additions & 0 deletions src/main/java/nl/wissehes/javatrain/controller/SiriController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nl.wissehes.javatrain.controller;

import nl.wissehes.javatrain.DataStore;
import nl.wissehes.javatrain.model.SiriMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/siri")
public class SiriController {

private final DataStore dataStore;

public SiriController(DataStore dataStore) {
this.dataStore = dataStore;
}

@GetMapping(path = "/messages")
public List<SiriMessage> getMessages(@RequestParam(required = false) String topicFilter) {

if(topicFilter != null) {
return dataStore.getRawSiriMessages().stream()
.filter(message -> message.topic().toLowerCase().contains(topicFilter.toLowerCase()))
.toList();
}

return dataStore.getRawSiriMessages();
}

public SiriMessage getMessage(int index) {
return dataStore.getRawSiriMessages().get(index);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package nl.wissehes.javatrain.model.SIRI;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import nl.wissehes.javatrain.model.SIRI.heartbeat.HeartbeatNotification;
import nl.wissehes.javatrain.model.SIRI.service.ServiceDelivery;

@JsonIgnoreProperties(ignoreUnknown = true)
@JacksonXmlRootElement
public class SiriMessageDocument {

@JacksonXmlProperty(localName = "ServiceDelivery")
public ServiceDelivery serviceDelivery;

@JacksonXmlProperty(localName = "HeartbeatNotification")
public HeartbeatNotification heartbeatNotification;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package nl.wissehes.javatrain.model.SIRI.heartbeat;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;

import java.time.OffsetDateTime;

/*
<RequestTimestamp>2025-05-01T17:04:30+02:00</RequestTimestamp>
<ProducerRef>DOVA</ProducerRef>
<Status>true</Status>
<ServiceStartedTime>2025-05-01T16:09:49+02:00</ServiceStartedTime>
*/

@JsonIgnoreProperties(ignoreUnknown = true)
public class HeartbeatNotification {

@JacksonXmlProperty(localName = "RequestTimestamp")
public OffsetDateTime requestTimestamp;

@JacksonXmlProperty(localName = "ProducerRef")
public String producerRef;

@JacksonXmlProperty(localName = "Status")
public boolean status;

@JacksonXmlProperty(localName = "ServiceStartedTime")
public OffsetDateTime serviceStartedTime;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Contains the necessary models for the SIRI messages.
*/
package nl.wissehes.javatrain.model.SIRI;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package nl.wissehes.javatrain.model.SIRI.service;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;

import java.time.OffsetDateTime;

@JsonIgnoreProperties(ignoreUnknown = true)
public class FacilityCondition {

@JacksonXmlProperty(localName = "FacilityRef")
public String facilityRef;

@JacksonXmlProperty(localName = "FacilityStatus")
public FacilityStatus facilityStatus;

@JacksonXmlProperty(localName = "ValidityPeriod")
public ValidityPeriod validityPeriod;

@JsonIgnoreProperties(ignoreUnknown = true)
public record FacilityStatus(
@JacksonXmlProperty(localName = "Status") String status
) { }

@JsonIgnoreProperties(ignoreUnknown = true)
public record ValidityPeriod(
@JacksonXmlProperty(localName = "StartTime") OffsetDateTime startTime
) { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package nl.wissehes.javatrain.model.SIRI.service;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;

import java.time.LocalDateTime;
import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
public class FacilityMonitoringDelivery {

@JacksonXmlProperty(localName = "ResponseTimestamp")
public LocalDateTime timestamp;

@JacksonXmlProperty(localName = "FacilityCondition")
@JacksonXmlElementWrapper(useWrapping = false)
public List<FacilityCondition> facilityCondition;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package nl.wissehes.javatrain.model.SIRI.service;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;

import java.time.OffsetDateTime;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ServiceDelivery {

@JacksonXmlProperty(localName = "ResponseTimestamp")
public OffsetDateTime timestamp;

@JacksonXmlProperty(localName = "ProducerRef")
public String producerRef;

@JacksonXmlProperty(localName = "FacilityMonitoringDelivery")
public FacilityMonitoringDelivery facilityMonitoringDelivery;

}
7 changes: 7 additions & 0 deletions src/main/java/nl/wissehes/javatrain/model/SiriMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nl.wissehes.javatrain.model;

public record SiriMessage(
String topic,
String message
) {
}