Skip to content
Snippets Groups Projects
Commit b90beda0 authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Started the conversion to Apache Flink

parent 06159b98
Branches
Tags
No related merge requests found
Showing
with 556 additions and 354 deletions
......@@ -24,7 +24,7 @@
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
......
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
org.eclipse.jdt.core.compiler.compliance=11
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=11
org.eclipse.jdt.core.compiler.source=1.8
......@@ -11,6 +11,9 @@
<sonar.organization>beamline</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<flink.version>1.14.3</flink.version>
<log4j.version>2.17.2</log4j.version>
</properties>
<repositories>
......@@ -34,7 +37,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.deckfour</groupId>
......@@ -46,16 +49,44 @@
<artifactId>spex</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>com.github.beamline</groupId>
<artifactId>graphviz</artifactId>
......@@ -67,6 +98,7 @@
<version>5.6</version>
</dependency>
<!-- For testing only -->
<dependency>
<groupId>org.junit.jupiter</groupId>
......@@ -74,6 +106,30 @@
<version>5.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
......
package beamline.events;
import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XAttributeBoolean;
import org.deckfour.xes.model.XAttributeContinuous;
import org.deckfour.xes.model.XAttributeDiscrete;
import org.deckfour.xes.model.XAttributeLiteral;
import org.deckfour.xes.model.XAttributeTimestamp;
import org.deckfour.xes.model.XTrace;
import beamline.exceptions.EventException;
/**
*
* @author Andrea Burattin
*/
public class BEvent implements Serializable, Comparable<BEvent> {
private static final long serialVersionUID = -7300189277034528917L;
private Map<String, Serializable> eventAttributes;
private Map<String, Serializable> traceAttributes;
private Map<String, Serializable> logAttributes;
public BEvent() {
this.eventAttributes = new HashMap<>();
this.traceAttributes = new HashMap<>();
this.logAttributes = new HashMap<>();
}
//
// Factories
//
/**
* Creates a new {@link XTrace} referring to one event
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @param time the time when the event has happened
* @param eventAttributes a collection of string attributes for the event
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static BEvent create(
String processName,
String activityName,
String caseId,
Date time,
Collection<Pair<String, String>> eventAttributes) throws EventException {
if (processName == null || activityName == null || caseId == null) {
throw new EventException("Activity name or case id missing");
}
BEvent event = new BEvent();
event.setProcessName(processName);
event.setTraceName(caseId);
event.setEventName(activityName);
if (time == null) {
event.setTimestamp(new Date());
} else {
event.setTimestamp(time);
}
if (eventAttributes != null) {
for(Pair<String, String> a : eventAttributes) {
event.setEventAttribute(a.getLeft(), a.getRight());
}
}
return event;
}
/**
* Creates a new {@link XTrace} referring to one event
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @param time the time when the event has happened
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static BEvent create(String processName, String activityName, String caseId, Date time) throws EventException {
return create(processName, activityName, caseId, time, null);
}
/**
* Creates a new {@link XTrace} referring to one event. The time of the
* event is set to the current time
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static BEvent create(String processName, String activityName, String caseId) throws EventException {
return create(processName, activityName, caseId, null, null);
}
//
// Specific methods
//
public void setProcessName(String name) {
setLogAttribute(XConceptExtension.KEY_NAME, name);
}
public String getProcessName() {
return (String) logAttributes.get(XConceptExtension.KEY_NAME);
}
public void setTraceName(String name) {
setTraceAttribute(XConceptExtension.KEY_NAME, name);
}
public String getTraceName() {
return (String) traceAttributes.get(XConceptExtension.KEY_NAME);
}
public void setEventName(String name) {
setEventAttribute(XConceptExtension.KEY_NAME, name);
}
public String getEventName() {
return (String) eventAttributes.get(XConceptExtension.KEY_NAME);
}
public void setTimestamp(Date timestamp) {
setEventAttribute(XTimeExtension.KEY_TIMESTAMP, timestamp);
}
public Date getEventTime() {
return (Date) eventAttributes.get(XTimeExtension.KEY_TIMESTAMP);
}
//
// General methods
//
public Map<String, Serializable> getEventAttributes() {
return eventAttributes;
}
public Map<String, Serializable> getTraceAttributes() {
return traceAttributes;
}
public Map<String, Serializable> getLogAttributes() {
return logAttributes;
}
public void setEventAttribute(String name, Serializable value) {
eventAttributes.put(name, value);
}
public void setEventAttribute(String name, XAttribute value) {
setAttributeFromXAttribute(eventAttributes, name, value);
}
public void setTraceAttribute(String name, Serializable value) {
traceAttributes.put(name, value);
}
public void setTraceAttribute(String name, XAttribute value) {
setAttributeFromXAttribute(traceAttributes, name, value);
}
public void setLogAttribute(String name, Serializable value) {
logAttributes.put(name, value);
}
public void setLogAttribute(String name, XAttribute value) {
setAttributeFromXAttribute(logAttributes, name, value);
}
//
// Overrides
//
@Override
public String toString() {
return logAttributes.toString() + " - " + traceAttributes.toString() + " - " + eventAttributes.toString();
}
@Override
public int compareTo(BEvent o) {
if (getEventTime() == null || o.getEventTime() == null) {
return 0;
}
return getEventTime().compareTo(o.getEventTime());
}
//
// Private methods
//
private void setAttributeFromXAttribute(Map<String, Serializable> map, String name, XAttribute value) {
if (value instanceof XAttributeBoolean) {
map.put(name, ((XAttributeBoolean) value).getValue());
} else if (value instanceof XAttributeContinuous) {
map.put(name, ((XAttributeContinuous) value).getValue());
} else if (value instanceof XAttributeDiscrete) {
map.put(name, ((XAttributeDiscrete) value).getValue());
} else if (value instanceof XAttributeLiteral) {
map.put(name, ((XAttributeLiteral) value).getValue());
} else if (value instanceof XAttributeTimestamp) {
map.put(name, ((XAttributeTimestamp) value).getValue());
}
}
}
package beamline.filters;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.model.XAttributeLiteral;
import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
/**
* A specific instance of the {@link ExcludeOnEventAttributeEqualityFilter} that
......@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
* @author Andrea Burattin
*
*/
public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<XAttributeLiteral> {
public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<String> {
private static final long serialVersionUID = -5319332746992005641L;
/**
* Constructors
......@@ -22,7 +22,7 @@ public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilt
super(XConceptExtension.KEY_NAME);
for (String activity : activities) {
addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity));
addValue(activity);
}
}
}
package beamline.filters;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XTrace;
import org.apache.flink.api.common.functions.FilterFunction;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.functions.Predicate;
import beamline.events.BEvent;
/**
* This filter excludes events based on the equality of a certain trace
......@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
*
* @param <T> the type of the attribute
*/
public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
public class ExcludeOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = 371257881178171433L;
private String attributeName;
private Set<T> attributeValues;
......@@ -38,8 +38,8 @@ public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implemen
}
@Override
public boolean test(@NonNull XTrace t) throws Throwable {
return !attributeValues.contains(t.getAttributes().get(attributeName));
public boolean filter(BEvent t) {
return !attributeValues.contains(t.getTraceAttributes().get(attributeName));
}
}
package beamline.filters;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XTrace;
import org.apache.flink.api.common.functions.FilterFunction;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.functions.Predicate;
import beamline.events.BEvent;
/**
* This filter excludes events based on the equality of a certain event
......@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
*
* @param <T> the type of the attribute
*/
public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
public class ExcludeOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = 1193680203608634150L;
private String attributeName;
private Set<T> attributeValues;
......@@ -47,8 +47,8 @@ public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> impleme
}
@Override
public boolean test(@NonNull XTrace t) throws Throwable {
return !attributeValues.contains(t.get(0).getAttributes().get(attributeName));
public boolean filter(BEvent t) {
return !attributeValues.contains(t.getEventAttributes().get(attributeName));
}
}
package beamline.filters;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.model.XAttributeLiteral;
import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
/**
* A specific instance of the {@link RetainOnEventAttributeEqualityFilter} that
......@@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
* @author Andrea Burattin
*
*/
public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<XAttributeLiteral> {
public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<String> {
private static final long serialVersionUID = 102039300555271213L;
/**
* Constructors
......@@ -22,7 +22,7 @@ public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter
super(XConceptExtension.KEY_NAME);
for (String activity : activities) {
addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity));
addValue(activity);
}
}
}
package beamline.filters;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XTrace;
import org.apache.flink.api.common.functions.FilterFunction;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.functions.Predicate;
import beamline.events.BEvent;
/**
* This filter retains events based on the equality of a certain trace
......@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
*
* @param <T> the type of the attribute
*/
public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
public class RetainOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = 1225284800265650317L;
private String attributeName;
private Set<T> attributeValues;
......@@ -38,8 +38,8 @@ public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implement
}
@Override
public boolean test(@NonNull XTrace t) throws Throwable {
return attributeValues.contains(t.getAttributes().get(attributeName));
public boolean filter(BEvent t) {
return attributeValues.contains(t.getTraceAttributes().get(attributeName));
}
}
package beamline.filters;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XTrace;
import org.apache.flink.api.common.functions.FilterFunction;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.functions.Predicate;
import beamline.events.BEvent;
/**
* This filter retains events based on the equality of a certain event
......@@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate;
*
* @param <T> the type of the attribute
*/
public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> {
public class RetainOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> {
private static final long serialVersionUID = -720485056040728235L;
private String attributeName;
private Set<T> attributeValues;
......@@ -47,8 +47,7 @@ public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implemen
}
@Override
public boolean test(@NonNull XTrace t) throws Throwable {
return attributeValues.contains(t.get(0).getAttributes().get(attributeName));
public boolean filter(BEvent event) {
return attributeValues.contains(event.getEventAttributes().get(attributeName));
}
}
package beamline.mappers;
/*
import java.util.HashMap;
import java.util.Map;
......@@ -23,7 +23,7 @@ import io.reactivex.rxjava3.functions.Function;
* number of case ids grows as well.
*
* @author Andrea Burattin
*/
*
public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, ObservableSource<DirectlyFollowsRelation>> {
private Map<String, XEvent> map = new HashMap<>();
......@@ -47,3 +47,5 @@ public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, Obser
}
}
*/
class InfiniteSizeDirectlyFollowsMapper{}
\ No newline at end of file
package beamline.models.algorithms;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.functions.Consumer;
import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import beamline.events.BEvent;
/**
* This abstract class defines the root of the mining algorithms hierarchy. It
* is a {@link Consumer} of elements with type <code>T</code> that is capable of
* producing responses of a certain type <code>K</code>.
* is a {@link MapFunction} of elements with type {@link BEvent} that is capable
* of producing responses of type {@link Response}.
*
* @author Andrea Burattin
*
* @param <T> the type of the consumed events
* @param <K> the type of the responses produced by the mining algorithm
*/
public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Serializable> {
private static final long serialVersionUID = 10170817098305999L;
private transient ValueState<Long> processedEvents;
private transient ValueState<Serializable> latestResponse;
private transient HookEventProcessing onBeforeEvent = null;
private transient HookEventProcessing onAfterEvent = null;
private int processedEvents = 0;
private K latestResponse;
private HookEventProcessing onBeforeEvent = null;
private HookEventProcessing onAfterEvent = null;
@Override
public void open(Configuration parameters) throws Exception {
processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processede vents", Long.class));
latestResponse = getRuntimeContext().getState(new ValueStateDescriptor<>("latest response", Serializable.class));
}
@Override
public Serializable map(BEvent t) throws Exception {
if (onBeforeEvent != null) {
onBeforeEvent.trigger();
}
process(t);
if (onAfterEvent != null) {
onAfterEvent.trigger();
}
return getLatestResponse();
}
/**
* This abstract method is what each derive class is expected to implement.
......@@ -28,15 +52,23 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
* @param event the new event being observed
* @return the result of the mining of the event
*/
public abstract K ingest(T event);
public abstract Serializable ingest(BEvent event);
/**
* Returns the total number of events processed so far
*
* @return the total number of events processed so far
*/
public int getProcessedEvents() {
return processedEvents;
public long getProcessedEvents() {
try {
if (processedEvents == null || processedEvents.value() == null) {
return 0l;
}
return processedEvents.value().longValue();
} catch (IOException e) {
e.printStackTrace();
}
return -1;
}
/**
......@@ -44,8 +76,13 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
*
* @return the latest result of the mining
*/
public K getLatestResponse() {
return latestResponse;
public Serializable getLatestResponse() {
try {
return latestResponse.value();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
......@@ -68,24 +105,31 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> {
this.onAfterEvent = onAfterEvent;
}
protected void process(T event) {
this.processedEvents++;
latestResponse = ingest(event);
/*
* The internal processor in charge of updating the internal status of the
* map.
*/
protected void process(BEvent event) {
try {
long value = 1;
if (processedEvents.value() != null) {
value = processedEvents.value() + 1;
}
protected K setLatestResponse(K latestResponse) {
this.latestResponse = latestResponse;
return latestResponse;
processedEvents.update(value);
latestResponse.update(ingest(event));
} catch (IOException e) {
e.printStackTrace();
}
@Override
public void accept(@NonNull T t) throws Throwable {
if (onBeforeEvent != null) {
onBeforeEvent.trigger();
}
process(t);
if (onAfterEvent != null) {
onAfterEvent.trigger();
/*
* Setter of the latest response onto the status.
*/
protected void setLatestResponse(Serializable latestResponse) {
try {
this.latestResponse.update(latestResponse);
} catch (IOException e) {
e.printStackTrace();
}
}
}
package beamline.sources;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import beamline.events.BEvent;
/**
* This interface is supposed just to bind the type of {@link SourceFunction} to
* {@link BEvent}.
*
* @author Andrea Burattin
*/
public abstract class BeamlineAbstractSource extends RichSourceFunction<BEvent> {
private static final long serialVersionUID = 1072198158533070679L;
private boolean running = true;
/**
*
* @return
*/
public boolean isRunning() {
return running;
}
@Override
public void cancel() {
running = false;
}
}
\ No newline at end of file
......@@ -8,32 +8,27 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.deckfour.xes.model.XTrace;
import com.opencsv.CSVParser;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import beamline.events.BEvent;
import beamline.exceptions.SourceException;
import beamline.utils.EventUtils;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
/**
* This implementation of a {@link XesSource} produces events according to the
* events contained in a CSV file. This source produces a cold observable.
* This implementation of a {@link BeamlineAbstractSource} produces events according to
* the events contained in a CSV file.
*
* @author Andrea Burattin
*/
public class CSVLogSource implements XesSource {
public class CSVLogSource extends BeamlineAbstractSource {
private CSVReader csvReader;
private static final long serialVersionUID = 205574514393782145L;
private transient CSVParser parser;
private String filename;
private int caseIdColumn;
private int activityNameColumn;
private CSVParser parser;
/**
* Constructs the source by providing a CSV parser.
......@@ -75,36 +70,31 @@ public class CSVLogSource implements XesSource {
}
@Override
public Observable<XTrace> getObservable() {
return Observable.create(new ObservableOnSubscribe<XTrace>() {
@Override
public void subscribe(@NonNull ObservableEmitter<@NonNull XTrace> emitter) throws Throwable {
public void run(SourceContext<BEvent> ctx) throws Exception {
Reader reader = null;
CSVReader csvReader = null;
try {
reader = Files.newBufferedReader(Paths.get(filename));
if (parser == null) {
csvReader = new CSVReader(reader);
} else {
csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build();
}
String[] line;
while ((line = csvReader.readNext()) != null) {
while ((line = csvReader.readNext()) != null && isRunning()) {
List<Pair<String, String>> attributes = new LinkedList<>();
for (int i = 0; i < line.length; i++) {
attributes.add(Pair.of("attribute_" + i, line[i]));
}
emitter.onNext(EventUtils.create(line[activityNameColumn], line[caseIdColumn], null, attributes));
}
emitter.onComplete();
ctx.collect(BEvent.create(filename, line[activityNameColumn], line[caseIdColumn], null, attributes));
}
});
}
@Override
public void prepare() throws SourceException {
Reader reader;
try {
reader = Files.newBufferedReader(Paths.get(filename));
} catch (IOException e) {
throw new SourceException(e.getMessage());
} finally {
if (csvReader != null) {
csvReader.close();
}
if (parser == null) {
csvReader = new CSVReader(reader);
} else {
csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build();
}
}
}
package beamline.sources;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
import org.deckfour.xes.model.XTrace;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
......@@ -11,14 +12,12 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import beamline.events.BEvent;
import beamline.exceptions.SourceException;
import beamline.utils.EventUtils;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.PublishSubject;
/**
* This implementation of a {@link XesSource} produces events as they are
* observed in an MQTT-XES broker. This source produces a hot observable.
* This implementation of a {@link BeamlineAbstractSource} produces events as
* they are observed in an MQTT-XES broker.
*
* <p>
* Example of usage:
......@@ -32,12 +31,13 @@ import io.reactivex.rxjava3.subjects.PublishSubject;
*
* @author Andrea Burattin
*/
public class MQTTXesSource implements XesSource {
public class MQTTXesSource extends BeamlineAbstractSource {
private static final long serialVersionUID = 7849358403852399322L;
private String processName;
private String brokerHost;
private String topicBase;
private PublishSubject<XTrace> ps;
private transient IMqttClient myClient;
/**
* Constructs the source
......@@ -50,21 +50,15 @@ public class MQTTXesSource implements XesSource {
this.brokerHost = brokerHost;
this.topicBase = topicBase;
this.processName = processName;
this.ps = PublishSubject.create();
}
@Override
public Observable<XTrace> getObservable() {
return ps;
}
@Override
public void prepare() throws SourceException {
public void run(SourceContext<BEvent> ctx) throws Exception {
Queue<BEvent> buffer = new LinkedList<>();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(30);
IMqttClient myClient;
try {
myClient = new MqttClient(brokerHost, UUID.randomUUID().toString());
myClient.setCallback(new MqttCallback() {
......@@ -75,7 +69,8 @@ public class MQTTXesSource implements XesSource {
String partBeforeActName = topic.substring(0, posLastSlash);
String activityName = topic.substring(posLastSlash + 1);
String caseId = partBeforeActName.substring(partBeforeActName.lastIndexOf("/") + 1);
ps.onNext(EventUtils.create(activityName, caseId));
BEvent b = BEvent.create(processName, activityName, caseId);
buffer.add(b);
}
@Override
......@@ -93,6 +88,25 @@ public class MQTTXesSource implements XesSource {
} catch (MqttException e) {
throw new SourceException(e.getMessage());
}
while(isRunning()) {
while (buffer.isEmpty()) {
Thread.sleep(100l);
}
ctx.collect(buffer.poll());
}
}
@Override
public void cancel() {
super.cancel();
if (myClient != null && myClient.isConnected()) {
try {
myClient.disconnect();
} catch (MqttException e) {
// nothing to do here
}
}
}
}
package beamline.sources;
import beamline.exceptions.SourceException;
import io.reactivex.rxjava3.core.Observable;
/**
* This interface is the base type that should be extended by all sources to be
* used in the framework. When using a source implementing this type, the method
* {@link #prepare()} should be called <strong>before</strong>
* {@link #getObservable()}.
*
* @author Andrea Burattin
*
* @param <T> the type of observable objects this interface will produce.
*/
public interface Source<T> {
/**
* This method returns the observable created by the source. Before calling
* this method, it is important to prepare the source by calling the
* {@link #prepare()} method.
*
* @return the {@link Observable}
*/
public Observable<T> getObservable();
/**
* This method is supposed to be called before the {@link #getObservable()}
* one: it is in charge of preparing the source to be processed.
*
* @throws SourceException while preparing the source, it is important to be
* aware that some sources may generate specific exceptions (e.g., file not
* found, network problems).
*/
public void prepare() throws SourceException;
}
......@@ -2,43 +2,41 @@ package beamline.sources;
import java.io.File;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.factory.XFactory;
import org.deckfour.xes.factory.XFactoryNaiveImpl;
import org.deckfour.xes.in.XMxmlGZIPParser;
import org.deckfour.xes.in.XMxmlParser;
import org.deckfour.xes.in.XParser;
import org.deckfour.xes.in.XesXmlGZIPParser;
import org.deckfour.xes.in.XesXmlParser;
import org.deckfour.xes.model.XAttribute;
import org.deckfour.xes.model.XAttributeMap;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XLog;
import org.deckfour.xes.model.XTrace;
import beamline.events.BEvent;
import beamline.exceptions.EventException;
import beamline.exceptions.SourceException;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
/**
* This implementation of a {@link XesSource} produces events according to the
* events contained in an {@link XLog}. The events are first sorted according to
* their timestamp and then sent. This source produces a cold observable.
* This implementation of a {@link BeamlineAbstractSource} produces events according to
* the events contained in an {@link XLog}. The events are first sorted
* according to their timestamp and then sent.
*
* @author Andrea Burattin
*/
public class XesLogSource implements XesSource {
public class XesLogSource extends BeamlineAbstractSource {
private static XFactory xesFactory = new XFactoryNaiveImpl();
private static final long serialVersionUID = 1095855454671335981L;
private String fileName;
private XLog log;
private List<XTrace> events;
private transient XLog log;
private List<BEvent> events;
/**
* Constructs a source from the path of a log
......@@ -62,28 +60,30 @@ public class XesLogSource implements XesSource {
}
@Override
public Observable<XTrace> getObservable() {
return Observable.create(new ObservableOnSubscribe<XTrace>() {
@Override
public void subscribe(@NonNull ObservableEmitter<@NonNull XTrace> emitter) throws Throwable {
for (XTrace wrapper : events) {
emitter.onNext(wrapper);
}
emitter.onComplete();
}
});
}
@Override
public void prepare() throws SourceException {
public void run(SourceContext<BEvent> ctx) throws Exception {
if (log == null) {
parseLog(fileName);
}
if (events == null) {
prepareStream();
}
Iterator<BEvent> i = events.iterator();
while(i.hasNext() && isRunning()) {
BEvent event = i.next();
if (event.getEventTime() != null) {
ctx.collectWithTimestamp(event, event.getEventTime().getTime());
} else {
ctx.collect(i.next());
}
}
}
private void parseLog(String fileName) throws SourceException {
XParser[] parsers = new XParser[] { new XesXmlGZIPParser(), new XesXmlParser() };
XParser[] parsers = new XParser[] {
new XesXmlGZIPParser(),
new XesXmlParser(),
new XMxmlParser(),
new XMxmlGZIPParser() };
File file = new File(fileName);
for (XParser p : parsers) {
if (p.canParse(file)) {
......@@ -98,41 +98,50 @@ public class XesLogSource implements XesSource {
throw new SourceException("XES file format not supported");
}
private void prepareStream() throws SourceException {
private void prepareStream() throws SourceException, EventException {
if (log.isEmpty()) {
throw new SourceException("The given log is empty");
}
// construct the process name
String processName = XConceptExtension.instance().extractName(log);
if (processName == null) {
processName = "unnamed-xes-process";
if (fileName != null) {
processName = fileName;
}
}
// populate all events
events = new LinkedList<>();
for (XTrace t : log) {
for (XEvent e : t) {
// create the wrapping trace
XTrace eventWrapper = xesFactory.createTrace();
XAttributeMap am = t.getAttributes();
for (Map.Entry<String, XAttribute> v : am.entrySet()) {
eventWrapper.getAttributes().put(v.getKey(), v.getValue());
BEvent be = BEvent.create(
processName,
XConceptExtension.instance().extractName(e),
XConceptExtension.instance().extractName(t),
XTimeExtension.instance().extractTimestamp(e));
// log attributes
for (Map.Entry<String, XAttribute> v : log.getAttributes().entrySet()) {
be.setLogAttribute(v.getKey(), v.getValue());
}
// create the actual event
XEvent newEvent = xesFactory.createEvent();
XAttributeMap amEvent = e.getAttributes();
for (Map.Entry<String, XAttribute> v : amEvent.entrySet()) {
newEvent.getAttributes().put(v.getKey(), v.getValue());
// trace attributes
for (Map.Entry<String, XAttribute> v : t.getAttributes().entrySet()) {
be.setTraceAttribute(v.getKey(), v.getValue());
}
// event attributes
for (Map.Entry<String, XAttribute> v : e.getAttributes().entrySet()) {
be.setEventAttribute(v.getKey(), v.getValue());
}
eventWrapper.add(newEvent);
events.add(eventWrapper);
events.add(be);
}
}
// sort events
Collections.sort(events, (XTrace o1, XTrace o2) -> {
XEvent e1 = o1.get(0);
XEvent e2 = o2.get(0);
Date d1 = XTimeExtension.instance().extractTimestamp(e1);
Date d2 = XTimeExtension.instance().extractTimestamp(e2);
if (d1 == null || d2 == null) {
return 0;
}
return d1.compareTo(d2);
});
Collections.sort(events);
}
}
package beamline.sources;
import org.deckfour.xes.model.XTrace;
/**
* This interface is supposed just to bind the type of {@link Source} to
* {@link XTrace}.
*
* @author Andrea Burattin
*/
public interface XesSource extends Source<XTrace> {
}
\ No newline at end of file
package beamline.utils;
import java.util.Collection;
import java.util.Date;
import org.apache.commons.lang3.tuple.Pair;
import org.deckfour.xes.extension.std.XConceptExtension;
import org.deckfour.xes.extension.std.XTimeExtension;
import org.deckfour.xes.factory.XFactory;
import org.deckfour.xes.factory.XFactoryNaiveImpl;
import org.deckfour.xes.model.XEvent;
import org.deckfour.xes.model.XTrace;
import org.deckfour.xes.model.impl.XAttributeLiteralImpl;
import beamline.exceptions.EventException;
/**
* This class contains some utility methods useful to handle and process events.
*
* @author Andrea Burattin
*/
public class EventUtils {
private static final XFactory xesFactory = new XFactoryNaiveImpl();
private EventUtils() { }
/**
* Creates a new {@link XTrace} referring to one event
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @param time the time when the event has happened
* @param eventAttributes a collection of string attributes for the event
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static XTrace create(String activityName, String caseId, Date time, Collection<Pair<String, String>> eventAttributes) throws EventException {
if (activityName == null || caseId == null) {
throw new EventException("Activity name or case id missing");
}
XEvent event = xesFactory.createEvent();
XConceptExtension.instance().assignName(event, activityName);
if (time == null) {
XTimeExtension.instance().assignTimestamp(event, new Date());
} else {
XTimeExtension.instance().assignTimestamp(event, time);
}
if (eventAttributes != null) {
for(Pair<String, String> a : eventAttributes) {
event.getAttributes().put(a.getLeft(), new XAttributeLiteralImpl(a.getLeft(), a.getRight()));
}
}
XTrace eventWrapper = xesFactory.createTrace();
XConceptExtension.instance().assignName(eventWrapper, caseId);
eventWrapper.add(event);
return eventWrapper;
}
/**
* Creates a new {@link XTrace} referring to one event
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @param time the time when the event has happened
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static XTrace create(String activityName, String caseId, Date time) throws EventException {
return create(activityName, caseId, time, null);
}
/**
* Creates a new {@link XTrace} referring to one event. The time of the
* event is set to the current time
*
* @param activityName the name of the activity
* @param caseId the identifier of the process instance
* @return the new event
* @throws EventException this exception is thrown is incomplete information
* is provided
*/
public static XTrace create(String activityName, String caseId) throws EventException {
return create(activityName, caseId, null, null);
}
/**
* Extracts the activity name
*
* @param event the event
* @return the activity name
*/
public static String getActivityName(XTrace event) {
return XConceptExtension.instance().extractName(event.get(0));
}
/**
* Extracts the case id
*
* @param event the event
* @return the case id
*/
public static String getCaseId(XTrace event) {
return XConceptExtension.instance().extractName(event);
}
}
/**
* This package contains utility classes to be used throughout the framework.
*/
package beamline.utils;
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment