diff --git a/pom.xml b/pom.xml index 27e527b1d93266c4aae676ffd2ef404199f0108b..3035e9f1946c422d772cb9abb21cde80d302a0d3 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>beamline</groupId> <artifactId>framework</artifactId> - <version>0.0.3</version> + <version>0.1.0</version> <properties> <maven.compiler.source>11</maven.compiler.source> @@ -66,6 +66,8 @@ <artifactId>opencsv</artifactId> <version>5.6</version> </dependency> + + <!-- For testing only --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> @@ -76,6 +78,19 @@ <groupId>org.hamcrest</groupId> <artifactId>hamcrest</artifactId> <version>2.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>5.17.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-mqtt</artifactId> + <version>5.17.0</version> + <scope>test</scope> </dependency> </dependencies> diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java index 0462e4bf4365f1ccf1d0ee6971d93708abb684f3..3fff9050f9f0d2129aff24f6f8e0faf231e78bad 100644 --- a/src/test/java/beamline/tests/SourcesTest.java +++ b/src/test/java/beamline/tests/SourcesTest.java @@ -5,15 +5,24 @@ import static org.hamcrest.Matchers.hasItems; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.List; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.Test; import com.opencsv.CSVParserBuilder; import beamline.exceptions.SourceException; import beamline.sources.CSVLogSource; +import beamline.sources.MQTTXesSource; import beamline.sources.XesLogSource; import beamline.utils.EventUtils; @@ -128,4 +137,69 @@ public class SourcesTest { assertThat(acts, hasItems("a11","a21","a22","a12","a23")); assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); } + + @Test + public void test_mqtt_1() { + try { + // create mqtt broker + BrokerService brokerService = createBroker(); + brokerService.start(); + brokerService.waitUntilStarted(); + + List<String> acts = new LinkedList<>(); + List<String> caseIds = new LinkedList<>(); + + // create actual source + MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name"); + source.prepare(); + source.getObservable().subscribe((t) -> { + acts.add(EventUtils.getActivityName(t)); + caseIds.add(EventUtils.getCaseId(t)); + }); + + MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); + client.connect(); + + publish(client, "c1", "a11"); + publish(client, "c2", "a21"); + publish(client, "c2", "a22"); + publish(client, "c1", "a12"); + publish(client, "c2", "a23"); + + Thread.sleep(100); + + assertThat(acts, hasItems("a11","a21","a22","a12","a23")); + assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void test_mqtt_2() { + MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name"); + assertThrowsExactly(SourceException.class, () -> source.prepare()); + } + + protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException { + client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false); + } + + protected BrokerService createBroker() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.setUseJmx(true); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPopulateJMSXUserID(true); + + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI("mqtt://localhost:9999")); + connector.setName("mqtt"); + brokerService.addConnector(connector); + + return brokerService; + } }