From d473ce34f65737477f738f8f943ceae0575763bb Mon Sep 17 00:00:00 2001
From: Andrea Burattin <andrea.burattin@gmail.com>
Date: Thu, 17 Mar 2022 14:44:02 +0100
Subject: [PATCH] Completed tests, version 0.1.0

---
 pom.xml                                       | 17 ++++-
 src/test/java/beamline/tests/SourcesTest.java | 74 +++++++++++++++++++
 2 files changed, 90 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 27e527b..3035e9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>beamline</groupId>
 	<artifactId>framework</artifactId>
-	<version>0.0.3</version>
+	<version>0.1.0</version>
 
 	<properties>
 		<maven.compiler.source>11</maven.compiler.source>
@@ -66,6 +66,8 @@
 			<artifactId>opencsv</artifactId>
 			<version>5.6</version>
 		</dependency>
+		
+		<!-- For testing only -->
 		<dependency>
 			<groupId>org.junit.jupiter</groupId>
 			<artifactId>junit-jupiter-api</artifactId>
@@ -76,6 +78,19 @@
 			<groupId>org.hamcrest</groupId>
 			<artifactId>hamcrest</artifactId>
 			<version>2.2</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.activemq</groupId>
+			<artifactId>activemq-broker</artifactId>
+			<version>5.17.0</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.activemq</groupId>
+			<artifactId>activemq-mqtt</artifactId>
+			<version>5.17.0</version>
+			<scope>test</scope>
 		</dependency>
 	</dependencies>
 
diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java
index 0462e4b..3fff905 100644
--- a/src/test/java/beamline/tests/SourcesTest.java
+++ b/src/test/java/beamline/tests/SourcesTest.java
@@ -5,15 +5,24 @@ import static org.hamcrest.Matchers.hasItems;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.junit.jupiter.api.Test;
 
 import com.opencsv.CSVParserBuilder;
 
 import beamline.exceptions.SourceException;
 import beamline.sources.CSVLogSource;
+import beamline.sources.MQTTXesSource;
 import beamline.sources.XesLogSource;
 import beamline.utils.EventUtils;
 
@@ -128,4 +137,69 @@ public class SourcesTest {
 		assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
 		assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
 	}
+
+	@Test
+	public void test_mqtt_1() {
+		try {
+			// create mqtt broker
+			BrokerService brokerService = createBroker();
+			brokerService.start();
+			brokerService.waitUntilStarted();
+			
+			List<String> acts = new LinkedList<>();
+			List<String> caseIds = new LinkedList<>();
+			
+			// create actual source
+			MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name");
+			source.prepare();
+			source.getObservable().subscribe((t) -> {
+				acts.add(EventUtils.getActivityName(t));
+				caseIds.add(EventUtils.getCaseId(t));
+			});
+
+			MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence());
+			client.connect();
+			
+			publish(client, "c1", "a11");
+			publish(client, "c2", "a21");
+			publish(client, "c2", "a22");
+			publish(client, "c1", "a12");
+			publish(client, "c2", "a23");
+			
+			Thread.sleep(100);
+			
+			assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
+			assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
+			
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+	
+	@Test
+	public void test_mqtt_2() {
+		MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name");
+		assertThrowsExactly(SourceException.class, () -> source.prepare());
+	}
+	
+	protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException {
+		client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false);
+	}
+	
+	protected BrokerService createBroker() throws Exception {
+		BrokerService brokerService = new BrokerService();
+		brokerService.setDeleteAllMessagesOnStartup(true);
+		brokerService.setPersistent(false);
+		brokerService.setAdvisorySupport(false);
+		brokerService.setUseJmx(true);
+		brokerService.getManagementContext().setCreateConnector(false);
+		brokerService.setPopulateJMSXUserID(true);
+
+		TransportConnector connector = new TransportConnector();
+		connector.setUri(new URI("mqtt://localhost:9999"));
+		connector.setName("mqtt");
+		brokerService.addConnector(connector);
+
+		return brokerService;
+	}
 }
-- 
GitLab