Skip to content
Snippets Groups Projects
Commit 2fd7a7a9 authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Added MQTT xes source

parent e54e539d
No related branches found
No related tags found
No related merge requests found
......@@ -2,12 +2,11 @@ package beamline.sources;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.factory.XFactory;
import org.deckfour.xes.factory.XFactoryNaiveImpl;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XTrace;
import org.eclipse.paho.client.mqttv3.IMqttClient;
......@@ -17,81 +16,62 @@ import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class MQTTXesSource implements XesSource {
private static XFactory xesFactory = new XFactoryNaiveImpl();
private String processName;
private String brokerHost;
private String topicBase;
private PublishSubject<XTrace> ps;
public MQTTXesSource(String brokerHost, String topicBase, String processName) {
this.brokerHost = brokerHost;
this.topicBase = topicBase;
this.processName = processName;
// this.stream = Stream.generate(new Supplier<XTrace>() {
// @Override
// public XTrace get() {
// try {
// return queue.take();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return null;
// }
// });
this.ps = PublishSubject.create();
}
@Override
public Observable<XTrace> getObservable() {
return Observable.create(new ObservableOnSubscribe<XTrace>() {
return ps;
}
@Override
public void subscribe(@NonNull ObservableEmitter<@NonNull XTrace> emitter) throws Throwable {
// TODO Auto-generated method stub
public void prepare() throws Exception {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(30);
}
IMqttClient myClient = new MqttClient(brokerHost, UUID.randomUUID().toString());
myClient.setCallback(new MqttCallback() {
});
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
int posLastSlash = topic.lastIndexOf("/");
String partBeforeActName = topic.substring(0, posLastSlash);
String activityName = topic.substring(posLastSlash + 1);
String caseId = partBeforeActName.substring(partBeforeActName.lastIndexOf("/") + 1);
XEvent event = xesFactory.createEvent();
XConceptExtension.instance().assignName(event, activityName);
XTimeExtension.instance().assignTimestamp(event, new Date());
XTrace eventWrapper = xesFactory.createTrace();
XConceptExtension.instance().assignName(eventWrapper, caseId);
eventWrapper.add(event);
ps.onNext(eventWrapper);
}
@Override
public void prepare() throws Exception {
// MqttConnectOptions options = new MqttConnectOptions();
// options.setCleanSession(true);
// options.setKeepAliveInterval(30);
//
// IMqttClient myClient = new MqttClient(brokerHost, UUID.randomUUID().toString());
// myClient.setCallback(new MqttCallback() {
//
// @Override
// public void messageArrived(String topic, MqttMessage message) throws Exception {
// int posLastSlash = topic.lastIndexOf("/");
// String partBeforeActName = topic.substring(0, posLastSlash);
// String activityName = topic.substring(posLastSlash + 1);
// String caseId = partBeforeActName.substring(partBeforeActName.lastIndexOf("/") + 1);
//
// XEvent event = xesFactory.createEvent();
// XConceptExtension.instance().assignName(event, activityName);
// XTimeExtension.instance().assignTimestamp(event, new Date());
// XTrace eventWrapper = xesFactory.createTrace();
// XConceptExtension.instance().assignName(eventWrapper, caseId);
// eventWrapper.add(event);
//
// queue.add(eventWrapper);
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken token) { }
//
// @Override
// public void connectionLost(Throwable cause) { }
// });
// myClient.connect(options);
// myClient.subscribe(topicBase + "/" + processName + "/#");
public void deliveryComplete(IMqttDeliveryToken token) { }
@Override
public void connectionLost(Throwable cause) { }
});
myClient.connect(options);
myClient.subscribe(topicBase + "/" + processName + "/#");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment