1. Einführung

Slides

2. MQTT

2.1. Sender

Die Sender Applikation ist eine Quarkus-Anwendung, die unter anderem auch Qute nutzt.
Mit folgendem Aufbau:

mqttSenderStructure

2.1.1. Dependencies

pom.xml
<!--[...]-->
<!--nur ein Ausschnitt der dependencies, nicht die ganze pom-->
<dependencies>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-messaging-mqtt</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-qute</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-jackson</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-arc</artifactId>
    </dependency>
</dependencies>
<!--[...]-->

2.1.2. Config

In der Config erstellen wir, einen Channel namens location und definieren diesen als outgoing, das heißt, dass wir über diesen Channel Nachrichten an den Broker senden.

Config funktioniert nur für den MQTT-Broker der Schule
application.properties
quarkus.http.port=8080
quarkus.package.type=uber-jar

base.topic=owntracks/custom/ (1)

mp.messaging.outgoing.location.connector=smallrye-mqtt
mp.messaging.outgoing.location.host=10.191.112.90 (2)
mp.messaging.outgoing.location.port=1883

mp.messaging.outgoing.location.username=student
mp.messaging.outgoing.location.password=passme
1 Wir definieren keine fixe topic, da wir diese im Code dynamisch bestimmen
2 Host zu "vm90.htl-leonding.ac.at" ändern, falls das Projekt außerhalb der Leocloud läuft
Falls man nur an eine bestimmte Topic senden will: mp.messaging.outgoing.location.topic=…​

2.1.3. Code

Code für das DTO
LocationDTO
package at.htl.leonding.entity;

import com.fasterxml.jackson.annotation.JsonProperty;

public class LocationDTO {

    @JsonProperty("_type")
    private String type;
    @JsonProperty("lat")
    private double lat;
    @JsonProperty("lon")
    private double lon;
    private String tid;

    public LocationDTO(String type, double lat, double lon, String tid) {
        this.type = type;
        this.lat = lat;
        this.lon = lon;
        this.tid = tid;
    }

    public LocationDTO() {
    }

    // region getter und setter

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public double getLat() {
        return lat;
    }

    public void setLat(double lat) {
        this.lat = lat;
    }

    public double getLon() {
        return lon;
    }

    public void setLon(double lon) {
        this.lon = lon;
    }

    public String getTid() {
        return tid;
    }

    public void setTid(String tid) {
        this.tid = tid;
    }

    // endregion
}
SomePage
    // [...]
    @ConfigProperty(name = "base.topic")
    String baseTopic; (1)

    @Inject
    @Channel("location") (3)
    Emitter<LocationDTO> emitter; (2)
    // [...]
1 Wir holen uns die Basis für unsere Topic aus den Configs
2 Wir injizieren einen Emitter, den wir verwenden werden, um Nachrichten zu versenden
3 Der Emitter sendet über den Channel location, den wir in der Config konfiguriert haben

Zum Versenden der Daten nutzen wir eine Methode, die über einen REST-Endpunkt aufgerufen wird.

SomePage
    // [...]
    @POST
    public void locationSender(
            @FormParam("name") String name, (1)
            @FormParam("latitude") double latitude,
            @FormParam("longitude") double longitude
    ) {
        String topic = baseTopic + name; (2)

        LocationDTO locationDTO = new LocationDTO(
                "location", // typ
                latitude,
                longitude,
                name
        );

        SendingMqttMessageMetadata metadata = new SendingMqttMessageMetadata( (3)
                topic,                  // topic
                MqttQoS.AT_LEAST_ONCE,  // Qos level (1)
                true                    // isRetain, gibt an ob die Nachricht permanent ist
        );

        MqttMessage<LocationDTO> message = MqttMessage.of(metadata, locationDTO); (4)

        emitter.send(message);
    }
    // [...]
1 Der Endpunkt wird von einem HTML Form aufgerufen, daher haben wir 3 @FormParam's
2 Erstellen der Topic, als Beispiel: owntracks/custom/Luka
3 In den Metadaten können wir die Topic bestimmen, daher konnte man diesen Schritt in der Config-Datei weglassen
4 Kombination von Metadaten und unserem DTO zu einer MqttMessage
Good to know: Imports
import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.reactive.messaging.mqtt.MqttMessage;
import io.smallrye.reactive.messaging.mqtt.SendingMqttMessageMetadata;
Qute Template
SomePage
    // [...]
    private final Template page;

    public SomePage(Template page) {
        this.page = requireNonNull(page, "page is required");
    }

    @GET
    @Produces(MediaType.TEXT_HTML)
    public TemplateInstance get() {
        return page.instance();
    }
    // [...]
Frontend
page.qute.html
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>MQTT-Sender</title>

    <link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" crossorigin="" />
    <script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js" integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo=" crossorigin=""></script>

    <style>
        #map{
            height: 90vh;
        }

        #container {
            height: 8vh;

            margin: 1vh 1vw 1vh 1vw;

            display: flex;

            align-items: center;
            justify-content: center;
        }

        html, body{
            overflow: hidden;
            margin: 0;
        }

    </style>
</head>
<body>
<div id="container">
    <form id="locationForm" action="/some-page" method="post">
        <label for="name">Enter your name:</label>
        <input type="text" id="name" name="name" required>

        <input type="text" id="latitude" name="latitude" required readonly placeholder="latitude">
        <input type="text" id="longitude" name="longitude" required readonly placeholder="longitude">

        <button type="submit">Send location!</button>
    </form>
</div>


<div id="map"></div>

<script>
    let map; // Declare map variable globally
    let marker; // Declare marker variable globally

    // Initialize the map
    function initMap() {
        map = L.map('map').setView([0, 0], 2); // Initialize map with a default view

        L.tileLayer('https://tile.openstreetmap.org/\{z}\/\{x}\/\{y}\.png').addTo(map);

        map.on('click', onMapClick);
    }

    function onMapClick(e) {
        // Get latitude and longitude from click event
        const lat = e.latlng.lat;
        const lon = e.latlng.lng;

        console.log("lat: " + lat);
        console.log("lon: " + lon);

        document.getElementById('latitude').value = lat;
        document.getElementById('longitude').value = lon;

        // If the marker is not yet created, create it
        if (!marker) {
            marker = L.marker([lat, lon]).addTo(map)
                .bindPopup('You clicked here!')
                .openPopup();
        } else {
            // Move the existing marker to the new position
            marker.setLatLng([lat, lon]).update();
        }
    }

    document.getElementById('locationForm').addEventListener('submit', function(event) {
        // Prevent the default form submission
        event.preventDefault();

        // Check if name is filled
        const name = document.getElementById('name').value;
        const lat = document.getElementById('latitude').value;
        const lon = document.getElementById('longitude').value;

        if (!name) {
            alert('Please enter your name.');
            return; // Stop form submission if name is empty
        }

        if (!lat || !lon) {
            alert('Please click on the map to set a location before submitting.');
            return; // Stop form submission if lat or lon are missing
        }

        // If all checks pass, submit the form
        this.submit();
    });

    initMap();
</script>

</body>
</html>

2.2. Receiver

Die Empfänger Applikation ist eine Quarkus-Anwendung, die unter anderem auch Qute nutzt.
Mit folgendem Aufbau:

mqttReceiverStructure

2.2.1. Dependencies

pom.xml
<!--[...]-->
<!--nur ein Ausschnitt der dependencies, nicht die ganze pom-->
<dependencies>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-messaging-mqtt</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-qute</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-jackson</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-arc</artifactId>
    </dependency>
</dependencies>
<!--[...]-->

2.2.2. Config

In der Config erstellen wir, einen Channel namens location und definieren diesen als incoming, das heißt, dass wir über diesen Channel Nachrichten vom Broker erhalten.

Config funktioniert nur für den MQTT-Broker der Schule
application.properties
quarkus.http.port=8081
quarkus.package.type=uber-jar

mp.messaging.incoming.location.connector=smallrye-mqtt
mp.messaging.incoming.location.host=vm90.htl-leonding.ac.at
mp.messaging.incoming.location.port=1883
mp.messaging.incoming.location.topic=owntracks/custom/# (1)

mp.messaging.incoming.location.qos=1 (2)

mp.messaging.incoming.location.username=student
mp.messaging.incoming.location.password=passme
1 Im Vergleich zur vorherigen config, bestimmen wir hier eine topic, wobei wir eine Wildcard verwenden, das heißt, wir bekommen alle topics, die denselben Anfang haben, zum Beispiel: owntracks/custom/Luka oder owntracks/custom/Walter
2 Beim Sender mussten wir das Qos-Level im Code bestimmen und hier machen wir es in der config
Falls man nur an eine bestimmte Topic erhalten will: mp.messaging.incoming.location.topic=owntracks/custom/Luka
Mehr zu Wildcards

Es gibt 2 verschiedene Wildcards:

  • +

    • Diese Wildcard findet nur topics die sich auf der selben Ebene befinden,
      wie zum Beispiel: owntracks/custom/luka

  • #

    • Diese Wildcard findet auch topics, die mehrere Ebenen tief sind,
      wie zum Beispiel: owntracks/custom/groupOne/luka

2.2.3. Code

Der Code für das DTO kann von oben kopiert werden
SomePage
    // [...]
    @Inject
    ObjectMapper mapper; (1)

    private final Map<String, String> tIdsMap = new HashMap<>();; (2)
    // [...]
1 Wir injizieren einen mapper, um die eintreffenden byte[] zu unser DTOs mappen zu können
2 Wir erstellen eine Hashmap, die als Key die Namen verwendet und als value dazu die Koordinaten hält
SomePage
    // [...]
    @Incoming("location")
    public void receiveLocation(byte[] data){
        try {
            LocationDTO myLocation = mapper.readValue(data, LocationDTO.class); (1)

            // sometimes the type is lwt and every other field is zero or null
            if(!Objects.equals(myLocation.getType(), "location") || myLocation.getTid() == null)
                return;

            synchronized (tIdsMap) { (2)
                tIdsMap.put(myLocation.getTid(), String.format("%s %s", myLocation.getLon(), myLocation.getLat()));
            }

        } catch (Exception e) {
            Log.warn(e);
        }
    }
    // [...]
1 Das byte[] wird zu unserem LocatinDTO gemappt
2 Die Methode, die das Qute-Template erstellt und die Methode zum Empfangen der Mqtt-Nachrichten laufen auf asynchron auf 2 verschiedenen Threads, daher braucht man das synchronized um die Hashmap zu befüllen, sonst hat man im Frontend immer eine leere Hashmap

Mehr zu synchronized: hier

Qute Template
SomePage
    // [...]
    private final Template page;

    public SomePage(Template page) {
        this.page = requireNonNull(page, "page is required");
    }

    @GET
    @Produces(MediaType.TEXT_HTML)
    public TemplateInstance get() {
        return page.data("tIds", tIdsMap, "keys", tIdsMap.keySet().stream().toList());
    }
    // [...]
Frontend
page.qute.html
{@java.util.ArrayList<String> keys}
{@java.util.HashMap tIds}

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Hello {name ?: "Qute"}</title>

    <link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" crossorigin="" />
    <script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js" integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo=" crossorigin=""></script>

    <style>
        #map{
            height: 90vh;
        }

        #selectionContainer{
            height: 8vh;
        }

        html, body{
            overflow: hidden;
            margin: 0;
        }
    </style>
</head>
<body>

<div id="selectionContainer">
    <label for="personSelect">Choose a person:</label>
    <select name="person" id="personSelect" onchange="checkSelection()">
        <option value="">select</option>
        {#each key in keys}
            <option value="{key}">{key}</option>
        {/each}
    </select>
</div>

<div id="map"></div>

<script>
    let tIds = {
        {#each key in keys}
        "{key}": "{tIds.get(key)}",
        {/each}
    };

    let map; // Declare map variable globally
    let marker; // Declare marker variable globally

    // Initialize the map
    function initMap() {
        map = L.map('map').setView([0, 0], 2); // Initialize map with a default view
        L.tileLayer('https://tile.openstreetmap.org/\{z}\/\{x}\/\{y}\.png').addTo(map);
    }

    // Change the marker position based on the selected person
    function checkSelection() {
        const selector = document.getElementById("personSelect").value; // Get the selected value

        if (selector) {
            const coordinates = tIds[selector].split(" "); // Split the coordinates string
            const lat = parseFloat(coordinates[1]); // Convert to float
            const lon = parseFloat(coordinates[0]); // Convert to float

            // Update map view
            map.setView([lat, lon], 13); // Change the map view

            // If the marker is not yet created, create it
            if (!marker) {
                marker = L.marker([lat, lon]).addTo(map)
                    .bindPopup('Last Position')
                    .openPopup();
            } else {
                // Move the existing marker to the new position
                marker.setLatLng([lat, lon]).update();
            }
        }
    }

    initMap();
</script>

</body>
</html>

3. Kafka

coming soon…​

3.1. Sender

Die Sender Applikation ist eine Quarkus-Anwendung. Mit folgendem Aufbau:

kafkaSenderStructure

3.1.1. Dependencies

pom.xml
<!--[...]-->
<!--nur ein Ausschnitt der dependencies, nicht die ganze pom-->
<dependencies>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-arc</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-messaging-kafka</artifactId>
    </dependency>
</dependencies>
<!--[...]-->

3.1.2. Config

application.properties
quarkus.http.port=8083
quarkus.package.jar.type=uber-jar

# Outgoing channel (producing messages to Kafka)
mp.messaging.outgoing.stats-out.connector=smallrye-kafka
mp.messaging.outgoing.stats-out.topic=stats (1)
1 Wir definieren den Channel stats-out über den wir Daten an die Topic stats schicken
Wir definieren keine Server Adresse, da Kafka einen default Broker für uns startet
Genaueres

Beim Starten von der Kafka App

INFO  [io.qua.kaf.cli.dep.DevServicesKafkaProcessor] (build-7)
Dev Services for Kafka started.
Other Quarkus applications in dev mode will find the broker automatically.
For Quarkus applications in production mode, you can connect to this by starting your application with
-Dkafka.bootstrap.servers=OUTSIDE://localhost:32773
brokerInDocker

3.1.3. Code

MyMessagingApplication
    // [...]
    @Inject
    @Channel("stats-out")
    Emitter<String> emitter; (1)

    void onStart(@Observes StartupEvent ev) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); (2)

        scheduler.scheduleAtFixedRate(this::sendStats, 0, 5, TimeUnit.SECONDS); (3)
    }
    // [...]
1 Wir injizieren einen Emitter mit unserem definierten Channel
2 Wir erstellen einen neuen Threadpool mit einem Thread
3 Wir rufen die Methode sendStats alle 5 Sekunden auf
MyMessagingApplication
    // [...]
    public void sendStats() {
        OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

        // CPU Load (System-wide)
        double cpuLoad = osBean.getCpuLoad() * 100;

        // RAM Usage (Total and Free memory) in bytes
        long totalPhysicalMemorySize = osBean.getTotalMemorySize();
        long freePhysicalMemorySize = osBean.getFreeMemorySize();
        long usedMemory = totalPhysicalMemorySize - freePhysicalMemorySize;

        // Convert memory values to gigabytes
        double totalMemoryGB = totalPhysicalMemorySize / (double) (1024 * 1024 * 1024);
        double freeMemoryGB = freePhysicalMemorySize / (double) (1024 * 1024 * 1024);
        double usedMemoryGB = usedMemory / (double) (1024 * 1024 * 1024);

        // Round CPU and memory values
        String formattedCpuLoad = String.format("%.0f", cpuLoad);
        String formattedTotalMemory = String.format("%.1f", totalMemoryGB);
        String formattedFreeMemory = String.format("%.1f", freeMemoryGB);
        String formattedUsedMemory = String.format("%.1f", usedMemoryGB);

        // Send formatted metrics to Kafka
        emitter.send(String.format(
                "cpu: %s%%, ram: %sGB, frei: %sGB, gebraucht: %sGB",
                formattedCpuLoad, formattedTotalMemory, formattedFreeMemory, formattedUsedMemory
        ));
    }

    // [...]

3.2. Receiver

Die Empfänger Applikation ist eine Quarkus-Anwendung.
Mit folgendem Aufbau:

kafkaReceiverStructure

3.2.1. Dependencies

pom.xml
<!--[...]-->
<!--nur ein Ausschnitt der dependencies, nicht die ganze pom-->
<dependencies>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-messaging-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-jackson</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-arc</artifactId>
    </dependency>
    <dependency>
        <groupId>io.reactivex.rxjava3</groupId>
        <artifactId>rxjava</artifactId>
    <version>3.1.9</version>
    </dependency>
</dependencies>
<!--[...]-->

3.2.2. Config

application.properties
quarkus.http.port=8082
quarkus.package.jar.type=uber-jar

# Incoming channel (consuming messages from Kafka)
mp.messaging.incoming.stats-in.connector=smallrye-kafka
mp.messaging.incoming.stats-in.topic=stats (1)
mp.messaging.incoming.stats-in.auto.offset.reset=latest (2)
1 Wir definieren den Channel stats-in über den wir Daten von der Topic stats erhalten
2 Wir definieren den Empfänger so, dass er sich die neusten Werte holt, earliest würde alle Werte der Topic abrufen

3.2.3. Code

MyMessagingApplication
    // [...]
    private final PublishSubject<String> statsSubject = PublishSubject.create(); (1)

    @Incoming("stats-in") (2)
    public void consumeStats(String stats) {
        statsSubject.onNext(stats); (3)
        System.out.println("Received stats: " + stats);
    }
    // [...]
1 Wir erstellen ein global PublishSubject variable, auf die wir uns später subscriben können
2 Bei neuen Daten über den Channel stats-in wird die Methode consumeStats aufgerufen
3 Wir füllen die States in unsere globale Variable
MyMessagingApplication
    // [...]
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS) (1)
    public void streamStats(@Context Sse sse, @Context SseEventSink eventSink) throws InterruptedException {
        statsSubject.subscribe( (2)
                c -> {
                    eventSink.send(sse.newEventBuilder().data(c).build()); (3)
                },
                error -> {
                    Log.warn(error.getMessage());
                }
        );
    }
    // [...]
1 Diese Methode produziert Server sent events, und kann daher Daten an den Browser senden
2 Wir subscriben uns auf unsere globale Variable
3 Bei jeder Änderung verschicken wir ein neues Event
index.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Real-Time Stats</title>
</head>
<body>
<h1>PC Stats</h1>
<pre id="stats"></pre>

<script>
    const statsElement = document.getElementById('stats');

    let evtSource = new EventSource("http://localhost:8082/stats/stream"); (1)
    evtSource.onmessage = function(e) {
        statsElement.textContent = e.data; (2)
    };
</script>
</body>
</html>
1 Wir definieren unsere streamStats Methode als Eventsource
2 Bei jeder neuen Message ändern wir den Inhalt unseres '<pre>' Html-Tags