diff --git a/.classpath b/.classpath index 0fb79cfe69b2c407027c6a76a12fc59553c9e195..002ad570e2e589f29101a6f361e1e4c36eac358d 100644 --- a/.classpath +++ b/.classpath @@ -24,7 +24,7 @@ <attribute name="test" value="true"/> </attributes> </classpathentry> - <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11"> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"> <attributes> <attribute name="maven.pomderived" value="true"/> </attributes> diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs index 2af1e7b99c98d3fc61561c085022741062a7820e..2f5cc74c3a8577df9faafd82992b0c62e56352be 100644 --- a/.settings/org.eclipse.jdt.core.prefs +++ b/.settings/org.eclipse.jdt.core.prefs @@ -1,8 +1,8 @@ eclipse.preferences.version=1 -org.eclipse.jdt.core.compiler.codegen.targetPlatform=11 -org.eclipse.jdt.core.compiler.compliance=11 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore org.eclipse.jdt.core.compiler.release=disabled -org.eclipse.jdt.core.compiler.source=11 +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/pom.xml b/pom.xml index 3035e9f1946c422d772cb9abb21cde80d302a0d3..23561860aefff8923c15531f4abbfbfa46224479 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,9 @@ <sonar.organization>beamline</sonar.organization> <sonar.host.url>https://sonarcloud.io</sonar.host.url> + + <flink.version>1.14.3</flink.version> + <log4j.version>2.17.2</log4j.version> </properties> <repositories> @@ -34,7 +37,7 @@ <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - <version>29.0-jre</version> + <version>31.1-jre</version> </dependency> <dependency> <groupId>org.deckfour</groupId> @@ -46,16 +49,44 @@ <artifactId>spex</artifactId> <version>1.0</version> </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.11</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> - <dependency> - <groupId>io.reactivex.rxjava3</groupId> - <artifactId>rxjava</artifactId> - <version>3.1.3</version> - </dependency> <dependency> <groupId>com.github.beamline</groupId> <artifactId>graphviz</artifactId> @@ -66,7 +97,8 @@ <artifactId>opencsv</artifactId> <version>5.6</version> </dependency> - + + <!-- For testing only --> <dependency> <groupId>org.junit.jupiter</groupId> @@ -74,6 +106,30 @@ <version>5.8.2</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.12</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest</artifactId> diff --git a/src/main/java/beamline/events/BEvent.java b/src/main/java/beamline/events/BEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..c4de45fcb3858b2013bc9390a73e1a20a9aacedc --- /dev/null +++ b/src/main/java/beamline/events/BEvent.java @@ -0,0 +1,251 @@ +package beamline.events; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.tuple.Pair; +import org.deckfour.xes.extension.std.XConceptExtension; +import org.deckfour.xes.extension.std.XTimeExtension; +import org.deckfour.xes.model.XAttribute; +import org.deckfour.xes.model.XAttributeBoolean; +import org.deckfour.xes.model.XAttributeContinuous; +import org.deckfour.xes.model.XAttributeDiscrete; +import org.deckfour.xes.model.XAttributeLiteral; +import org.deckfour.xes.model.XAttributeTimestamp; +import org.deckfour.xes.model.XTrace; + +import beamline.exceptions.EventException; + +/** + * + * @author Andrea Burattin + */ +public class BEvent implements Serializable, Comparable<BEvent> { + + private static final long serialVersionUID = -7300189277034528917L; + + private Map<String, Serializable> eventAttributes; + private Map<String, Serializable> traceAttributes; + private Map<String, Serializable> logAttributes; + + public BEvent() { + this.eventAttributes = new HashMap<>(); + this.traceAttributes = new HashMap<>(); + this.logAttributes = new HashMap<>(); + } + + // + // Factories + // + /** + * Creates a new {@link XTrace} referring to one event + * + * @param activityName the name of the activity + * @param caseId the identifier of the process instance + * @param time the time when the event has happened + * @param eventAttributes a collection of string attributes for the event + * @return the new event + * @throws EventException this exception is thrown is incomplete information + * is provided + */ + public static BEvent create( + String processName, + String activityName, + String caseId, + Date time, + Collection<Pair<String, String>> eventAttributes) throws EventException { + if (processName == null || activityName == null || caseId == null) { + throw new EventException("Activity name or case id missing"); + } + + BEvent event = new BEvent(); + event.setProcessName(processName); + event.setTraceName(caseId); + event.setEventName(activityName); + if (time == null) { + event.setTimestamp(new Date()); + } else { + event.setTimestamp(time); + } + + if (eventAttributes != null) { + for(Pair<String, String> a : eventAttributes) { + event.setEventAttribute(a.getLeft(), a.getRight()); + } + } + return event; + } + + /** + * Creates a new {@link XTrace} referring to one event + * + * @param activityName the name of the activity + * @param caseId the identifier of the process instance + * @param time the time when the event has happened + * @return the new event + * @throws EventException this exception is thrown is incomplete information + * is provided + */ + public static BEvent create(String processName, String activityName, String caseId, Date time) throws EventException { + return create(processName, activityName, caseId, time, null); + } + + /** + * Creates a new {@link XTrace} referring to one event. The time of the + * event is set to the current time + * + * @param activityName the name of the activity + * @param caseId the identifier of the process instance + * @return the new event + * @throws EventException this exception is thrown is incomplete information + * is provided + */ + public static BEvent create(String processName, String activityName, String caseId) throws EventException { + return create(processName, activityName, caseId, null, null); + } + + // + // Specific methods + // + public void setProcessName(String name) { + setLogAttribute(XConceptExtension.KEY_NAME, name); + } + + public String getProcessName() { + return (String) logAttributes.get(XConceptExtension.KEY_NAME); + } + + public void setTraceName(String name) { + setTraceAttribute(XConceptExtension.KEY_NAME, name); + } + + public String getTraceName() { + return (String) traceAttributes.get(XConceptExtension.KEY_NAME); + } + + public void setEventName(String name) { + setEventAttribute(XConceptExtension.KEY_NAME, name); + } + + public String getEventName() { + return (String) eventAttributes.get(XConceptExtension.KEY_NAME); + } + + public void setTimestamp(Date timestamp) { + setEventAttribute(XTimeExtension.KEY_TIMESTAMP, timestamp); + } + + public Date getEventTime() { + return (Date) eventAttributes.get(XTimeExtension.KEY_TIMESTAMP); + } + + // + // General methods + // + + public Map<String, Serializable> getEventAttributes() { + return eventAttributes; + } + + public Map<String, Serializable> getTraceAttributes() { + return traceAttributes; + } + + public Map<String, Serializable> getLogAttributes() { + return logAttributes; + } + + public void setEventAttribute(String name, Serializable value) { + eventAttributes.put(name, value); + } + + public void setEventAttribute(String name, XAttribute value) { + setAttributeFromXAttribute(eventAttributes, name, value); + } + + public void setTraceAttribute(String name, Serializable value) { + traceAttributes.put(name, value); + } + + public void setTraceAttribute(String name, XAttribute value) { + setAttributeFromXAttribute(traceAttributes, name, value); + } + + public void setLogAttribute(String name, Serializable value) { + logAttributes.put(name, value); + } + + public void setLogAttribute(String name, XAttribute value) { + setAttributeFromXAttribute(logAttributes, name, value); + } + + // + // Overrides + // + + @Override + public String toString() { + return logAttributes.toString() + " - " + traceAttributes.toString() + " - " + eventAttributes.toString(); + } + + @Override + public int compareTo(BEvent o) { + if (getEventTime() == null || o.getEventTime() == null) { + return 0; + } + return getEventTime().compareTo(o.getEventTime()); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + BEvent other = (BEvent) obj; + return new EqualsBuilder() + .appendSuper(super.equals(obj)) + .append(logAttributes, other.logAttributes) + .append(traceAttributes, other.traceAttributes) + .append(eventAttributes, other.eventAttributes) + .isEquals(); + + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(logAttributes) + .append(traceAttributes) + .append(eventAttributes) + .toHashCode(); + } + + // + // Private methods + // + + private void setAttributeFromXAttribute(Map<String, Serializable> map, String name, XAttribute value) { + if (value instanceof XAttributeBoolean) { + map.put(name, ((XAttributeBoolean) value).getValue()); + } else if (value instanceof XAttributeContinuous) { + map.put(name, ((XAttributeContinuous) value).getValue()); + } else if (value instanceof XAttributeDiscrete) { + map.put(name, ((XAttributeDiscrete) value).getValue()); + } else if (value instanceof XAttributeLiteral) { + map.put(name, ((XAttributeLiteral) value).getValue()); + } else if (value instanceof XAttributeTimestamp) { + map.put(name, ((XAttributeTimestamp) value).getValue()); + } + } +} diff --git a/src/main/java/beamline/filters/ExcludeActivitiesFilter.java b/src/main/java/beamline/filters/ExcludeActivitiesFilter.java index 4886ad014508aef80c95911d458bce39dedbaef4..9f08a59f34f9b4d0f18133958fe79ccbd8e415dd 100644 --- a/src/main/java/beamline/filters/ExcludeActivitiesFilter.java +++ b/src/main/java/beamline/filters/ExcludeActivitiesFilter.java @@ -1,8 +1,6 @@ package beamline.filters; import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.model.XAttributeLiteral; -import org.deckfour.xes.model.impl.XAttributeLiteralImpl; /** * A specific instance of the {@link ExcludeOnEventAttributeEqualityFilter} that @@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl; * @author Andrea Burattin * */ -public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<XAttributeLiteral> { +public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilter<String> { + + private static final long serialVersionUID = -5319332746992005641L; /** * Constructors @@ -22,7 +22,7 @@ public class ExcludeActivitiesFilter extends ExcludeOnEventAttributeEqualityFilt super(XConceptExtension.KEY_NAME); for (String activity : activities) { - addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity)); + addValue(activity); } } } diff --git a/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java b/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java index 32efc97ea0df711e1cfbfc9edda1998073c813ea..87c2cec410079ecd459f1644b541cac582ff5223 100644 --- a/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java +++ b/src/main/java/beamline/filters/ExcludeOnCaseAttributeEqualityFilter.java @@ -1,14 +1,13 @@ package beamline.filters; +import java.io.Serializable; import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import org.deckfour.xes.model.XAttribute; -import org.deckfour.xes.model.XTrace; +import org.apache.flink.api.common.functions.FilterFunction; -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.functions.Predicate; +import beamline.events.BEvent; /** * This filter excludes events based on the equality of a certain trace @@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; * * @param <T> the type of the attribute */ -public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { +public class ExcludeOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> { + private static final long serialVersionUID = 371257881178171433L; private String attributeName; private Set<T> attributeValues; @@ -38,8 +38,8 @@ public class ExcludeOnCaseAttributeEqualityFilter<T extends XAttribute> implemen } @Override - public boolean test(@NonNull XTrace t) throws Throwable { - return !attributeValues.contains(t.getAttributes().get(attributeName)); + public boolean filter(BEvent t) { + return !attributeValues.contains(t.getTraceAttributes().get(attributeName)); } } diff --git a/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java b/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java index 1e090c40ea5695bc75225f6b38e12de8a830fc32..94e8fb86963c045a11f919ead66ef233f70c2faf 100644 --- a/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java +++ b/src/main/java/beamline/filters/ExcludeOnEventAttributeEqualityFilter.java @@ -1,14 +1,13 @@ package beamline.filters; +import java.io.Serializable; import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import org.deckfour.xes.model.XAttribute; -import org.deckfour.xes.model.XTrace; +import org.apache.flink.api.common.functions.FilterFunction; -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.functions.Predicate; +import beamline.events.BEvent; /** * This filter excludes events based on the equality of a certain event @@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; * * @param <T> the type of the attribute */ -public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { +public class ExcludeOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> { + private static final long serialVersionUID = 1193680203608634150L; private String attributeName; private Set<T> attributeValues; @@ -47,8 +47,8 @@ public class ExcludeOnEventAttributeEqualityFilter<T extends XAttribute> impleme } @Override - public boolean test(@NonNull XTrace t) throws Throwable { - return !attributeValues.contains(t.get(0).getAttributes().get(attributeName)); + public boolean filter(BEvent t) { + return !attributeValues.contains(t.getEventAttributes().get(attributeName)); } } diff --git a/src/main/java/beamline/filters/RetainActivitiesFilter.java b/src/main/java/beamline/filters/RetainActivitiesFilter.java index 8e0a5253517d99716ef98848985c58ac83d701b9..93249161ff2b3f21c08e61288bde518a0d7648a9 100644 --- a/src/main/java/beamline/filters/RetainActivitiesFilter.java +++ b/src/main/java/beamline/filters/RetainActivitiesFilter.java @@ -1,8 +1,6 @@ package beamline.filters; import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.model.XAttributeLiteral; -import org.deckfour.xes.model.impl.XAttributeLiteralImpl; /** * A specific instance of the {@link RetainOnEventAttributeEqualityFilter} that @@ -11,7 +9,9 @@ import org.deckfour.xes.model.impl.XAttributeLiteralImpl; * @author Andrea Burattin * */ -public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<XAttributeLiteral> { +public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter<String> { + + private static final long serialVersionUID = 102039300555271213L; /** * Constructors @@ -22,7 +22,7 @@ public class RetainActivitiesFilter extends RetainOnEventAttributeEqualityFilter super(XConceptExtension.KEY_NAME); for (String activity : activities) { - addValue(new XAttributeLiteralImpl(XConceptExtension.KEY_NAME, activity)); + addValue(activity); } } } diff --git a/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java b/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java index 091d1aa3d9711b93db6674bb03fad35710781ac8..d781227fbb63934a475e018f6ca5c66c59146cc4 100644 --- a/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java +++ b/src/main/java/beamline/filters/RetainOnCaseAttributeEqualityFilter.java @@ -1,14 +1,13 @@ package beamline.filters; +import java.io.Serializable; import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import org.deckfour.xes.model.XAttribute; -import org.deckfour.xes.model.XTrace; +import org.apache.flink.api.common.functions.FilterFunction; -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.functions.Predicate; +import beamline.events.BEvent; /** * This filter retains events based on the equality of a certain trace @@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; * * @param <T> the type of the attribute */ -public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { +public class RetainOnCaseAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> { + private static final long serialVersionUID = 1225284800265650317L; private String attributeName; private Set<T> attributeValues; @@ -38,8 +38,8 @@ public class RetainOnCaseAttributeEqualityFilter<T extends XAttribute> implement } @Override - public boolean test(@NonNull XTrace t) throws Throwable { - return attributeValues.contains(t.getAttributes().get(attributeName)); + public boolean filter(BEvent t) { + return attributeValues.contains(t.getTraceAttributes().get(attributeName)); } } diff --git a/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java b/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java index f923054a6fd0e2dd45223b83bed9d983f9abe91f..13754046483d3522a5220a51a8efce540749dd54 100644 --- a/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java +++ b/src/main/java/beamline/filters/RetainOnEventAttributeEqualityFilter.java @@ -1,14 +1,13 @@ package beamline.filters; +import java.io.Serializable; import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import org.deckfour.xes.model.XAttribute; -import org.deckfour.xes.model.XTrace; +import org.apache.flink.api.common.functions.FilterFunction; -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.functions.Predicate; +import beamline.events.BEvent; /** * This filter retains events based on the equality of a certain event @@ -20,8 +19,9 @@ import io.reactivex.rxjava3.functions.Predicate; * * @param <T> the type of the attribute */ -public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implements Predicate<XTrace> { +public class RetainOnEventAttributeEqualityFilter<T extends Serializable> implements FilterFunction<BEvent> { + private static final long serialVersionUID = -720485056040728235L; private String attributeName; private Set<T> attributeValues; @@ -47,8 +47,7 @@ public class RetainOnEventAttributeEqualityFilter<T extends XAttribute> implemen } @Override - public boolean test(@NonNull XTrace t) throws Throwable { - return attributeValues.contains(t.get(0).getAttributes().get(attributeName)); + public boolean filter(BEvent event) { + return attributeValues.contains(event.getEventAttributes().get(attributeName)); } - } diff --git a/src/main/java/beamline/mappers/DirectlyFollowsRelation.java b/src/main/java/beamline/mappers/DirectlyFollowsRelation.java deleted file mode 100644 index a15690cf5f1758e464c607f41bad1087ef535552..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/mappers/DirectlyFollowsRelation.java +++ /dev/null @@ -1,56 +0,0 @@ -package beamline.mappers; - -import org.deckfour.xes.model.XEvent; - -/** - * This class represents a directly follows relation as produced by - * {@link InfiniteSizeDirectlyFollowsMapper}. - * - * @author Andrea Burattin - */ -public class DirectlyFollowsRelation { - - private String caseId; - private XEvent first; - private XEvent second; - - /** - * Constructor - * - * @param caseId the case id - * @param first the first event - * @param second the second event - */ - public DirectlyFollowsRelation(String caseId, XEvent first, XEvent second) { - this.caseId = caseId; - this.first = first; - this.second = second; - } - - /** - * Returns the case id this directly follows relation belongs to - * - * @return the case id - */ - public String getCaseId() { - return caseId; - } - - /** - * Returns the first event - * - * @return the first event - */ - public XEvent getFirst() { - return first; - } - - /** - * Returns the second event - * - * @return the second event - */ - public XEvent getSecond() { - return second; - } -} diff --git a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java b/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java deleted file mode 100644 index f2c8b648aea1bf1c3b484371d5c00d85009ea116..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/mappers/InfiniteSizeDirectlyFollowsMapper.java +++ /dev/null @@ -1,49 +0,0 @@ -package beamline.mappers; - -import java.util.HashMap; -import java.util.Map; - -import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.model.XEvent; -import org.deckfour.xes.model.XTrace; - -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.core.ObservableSource; -import io.reactivex.rxjava3.functions.Function; - -/** - * This mapper transforms a stream of {@link XTrace}s into a stream of - * {@link DirectlyFollowsRelation}s. It transforms each pair of consequent - * events appearing in the same case as a directly follows operator (generating - * an object with type {@link DirectlyFollowsRelation}). - * - * <p> - * This mapper is called infinite because it's memory footprint will grow as the - * number of case ids grows as well. - * - * @author Andrea Burattin - */ -public class InfiniteSizeDirectlyFollowsMapper implements Function<XTrace, ObservableSource<DirectlyFollowsRelation>> { - - private Map<String, XEvent> map = new HashMap<>(); - - @Override - public @NonNull ObservableSource<DirectlyFollowsRelation> apply(@NonNull XTrace t) throws Throwable { - String caseId = XConceptExtension.instance().extractName(t); - DirectlyFollowsRelation toRet = null; - - if (map.containsKey(caseId)) { - toRet = new DirectlyFollowsRelation(caseId, map.get(caseId), t.get(0)); - } - - map.put(caseId, t.get(0)); - - if (toRet == null) { - return Observable.empty(); - } else { - return Observable.just(toRet); - } - } - -} diff --git a/src/main/java/beamline/mappers/package-info.java b/src/main/java/beamline/mappers/package-info.java deleted file mode 100644 index 9e81654575586012e88a585616ee139bbaadb325..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/mappers/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -/** - * This package contains some mappers that are available by default in the - * framework. - */ -package beamline.mappers; \ No newline at end of file diff --git a/src/main/java/beamline/models/algorithms/HookEventProcessing.java b/src/main/java/beamline/models/algorithms/HookEventProcessing.java deleted file mode 100644 index 590620e23622c47b7cbe6f97f6a5bf558c537656..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/models/algorithms/HookEventProcessing.java +++ /dev/null @@ -1,17 +0,0 @@ -package beamline.models.algorithms; - -/** - * This interface defines the structure of the callback function that a - * {@link StreamMiningAlgorithm} can execute (cf., - * {@link StreamMiningAlgorithm#setOnBeforeEvent(HookEventProcessing)} and - * {@link StreamMiningAlgorithm#setOnAfterEvent(HookEventProcessing)}). - * - * @author Andrea Burattin - */ -public interface HookEventProcessing { - - /** - * The actual function to trigger - */ - public void trigger(); -} diff --git a/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java b/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..6eadf941de91fb083154c5d4ff466a3029863226 --- /dev/null +++ b/src/main/java/beamline/models/algorithms/InfiniteSizeDirectlyFollowsMapper.java @@ -0,0 +1,41 @@ +package beamline.models.algorithms; + +import java.util.HashMap; +import java.util.Map; + +import org.deckfour.xes.model.XTrace; + +import beamline.events.BEvent; +import beamline.models.responses.DirectlyFollowsRelation; + +/** + * This mapper transforms a stream of {@link XTrace}s into a stream of + * {@link DirectlyFollowsRelation}s. It transforms each pair of consequent + * events appearing in the same case as a directly follows operator (generating + * an object with type {@link DirectlyFollowsRelation}). + * + * <p> + * This mapper is called infinite because it's memory footprint will grow as the + * number of case ids grows as well. + * + * @author Andrea Burattin + */ +public class InfiniteSizeDirectlyFollowsMapper extends StreamMiningAlgorithm<DirectlyFollowsRelation> { + + private static final long serialVersionUID = 9114527510820073110L; + private Map<String, BEvent> map = new HashMap<>(); + + @Override + public DirectlyFollowsRelation ingest(BEvent event) throws Exception { + String caseId = event.getTraceName(); + DirectlyFollowsRelation toRet = null; + + if (map.containsKey(caseId)) { + toRet = new DirectlyFollowsRelation(map.get(caseId), event); + } + + map.put(caseId, event); + + return toRet; + } +} diff --git a/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java b/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java index 5f2c812e1dbff166c24f1a96065daeefaa0392d6..f0c4cf943da1cafd9ec851c782faae1560b4c76c 100644 --- a/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java +++ b/src/main/java/beamline/models/algorithms/StreamMiningAlgorithm.java @@ -1,24 +1,41 @@ package beamline.models.algorithms; -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.functions.Consumer; +import java.io.IOException; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import beamline.events.BEvent; +import beamline.models.responses.Response; /** * This abstract class defines the root of the mining algorithms hierarchy. It - * is a {@link Consumer} of elements with type <code>T</code> that is capable of - * producing responses of a certain type <code>K</code>. + * is a {@link MapFunction} of elements with type {@link BEvent} that is capable + * of producing responses of type {@link Response}. * * @author Andrea Burattin - * - * @param <T> the type of the consumed events - * @param <K> the type of the responses produced by the mining algorithm */ -public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> { +public abstract class StreamMiningAlgorithm<T extends Response> extends RichFlatMapFunction<BEvent, T> { - private int processedEvents = 0; - private K latestResponse; - private HookEventProcessing onBeforeEvent = null; - private HookEventProcessing onAfterEvent = null; + private static final long serialVersionUID = 10170817098305999L; + private transient ValueState<Long> processedEvents; + + @Override + public void open(Configuration parameters) throws Exception { + processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processed-events", Long.class)); + } + + @Override + public void flatMap(BEvent event, Collector<T> out) throws Exception { + T latestResponse = process(event); + if (latestResponse != null) { + out.collect(latestResponse); + } + } /** * This abstract method is what each derive class is expected to implement. @@ -27,65 +44,46 @@ public abstract class StreamMiningAlgorithm<T, K> implements Consumer<T> { * * @param event the new event being observed * @return the result of the mining of the event + * @throws Exception */ - public abstract K ingest(T event); + public abstract T ingest(BEvent event) throws Exception; /** * Returns the total number of events processed so far * * @return the total number of events processed so far */ - public int getProcessedEvents() { - return processedEvents; - } - - /** - * Returns the latest result of the mining - * - * @return the latest result of the mining - */ - public K getLatestResponse() { - return latestResponse; + public long getProcessedEvents() { + try { + if (processedEvents == null || processedEvents.value() == null) { + return -1; + } + return processedEvents.value().longValue(); + } catch (IOException e) { + e.printStackTrace(); + } + return -1; } - /** - * This method can be used to set a hook to a callback function to be - * executed before an event is processed - * - * @param onBeforeEvent the callback function - */ - public void setOnBeforeEvent(HookEventProcessing onBeforeEvent) { - this.onBeforeEvent = onBeforeEvent; - } - /** - * This method can be used to set a hook to a callback function to be - * executed after an event is processed - * - * @param onAfterEvent the callback function + /* + * The internal processor in charge of updating the internal status of the + * map. */ - public void setOnAfterEvent(HookEventProcessing onAfterEvent) { - this.onAfterEvent = onAfterEvent; - } - - protected void process(T event) { - this.processedEvents++; - latestResponse = ingest(event); - } - - protected K setLatestResponse(K latestResponse) { - this.latestResponse = latestResponse; - return latestResponse; - } - - @Override - public void accept(@NonNull T t) throws Throwable { - if (onBeforeEvent != null) { - onBeforeEvent.trigger(); + protected T process(BEvent event) throws Exception { + try { + long value = 1; + if (processedEvents.value() != null) { + value = processedEvents.value() + 1; + } + processedEvents.update(value); + } catch (IOException e) { + e.printStackTrace(); } - process(t); - if (onAfterEvent != null) { - onAfterEvent.trigger(); + T tmp = ingest(event); + if (tmp != null) { + tmp.setProcessedEvents(getProcessedEvents()); } + return tmp; } } diff --git a/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java b/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java new file mode 100644 index 0000000000000000000000000000000000000000..0b2742e1e4986d0231546011a52a0639ef2ea490 --- /dev/null +++ b/src/main/java/beamline/models/responses/DirectlyFollowsRelation.java @@ -0,0 +1,89 @@ +package beamline.models.responses; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.tuple.Pair; + +import beamline.events.BEvent; +import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper; + +/** + * This class represents a directly follows relation as produced by + * {@link InfiniteSizeDirectlyFollowsMapper}. + * + * @author Andrea Burattin + */ +public class DirectlyFollowsRelation extends Response { + + private static final long serialVersionUID = 1775695752885219490L; + private Pair<BEvent, BEvent> pair; + + /** + * Constructor + * + * @param caseId the case id + * @param first the first event + * @param second the second event + */ + public DirectlyFollowsRelation(BEvent from, BEvent to) { + if (!from.getTraceName().equals(to.getTraceName())) { + throw new IllegalArgumentException(); + } + pair = Pair.of(from, to); + } + + /** + * Returns the case id this directly follows relation belongs to + * + * @return the case id + */ + public String getCaseId() { + return pair.getLeft().getTraceName(); + } + + /** + * Returns the source event + * + * @return the source event + */ + public BEvent getFrom() { + return pair.getLeft(); + } + + /** + * Returns the target event + * + * @return the target event + */ + public BEvent getTo() { + return pair.getRight(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + DirectlyFollowsRelation other = (DirectlyFollowsRelation) obj; + return new EqualsBuilder() + .appendSuper(super.equals(obj)) + .append(getFrom(), other.getFrom()) + .append(getTo(), other.getTo()) + .isEquals(); + + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(getFrom()) + .append(getTo()) + .toHashCode(); + } +} diff --git a/src/main/java/beamline/models/responses/GraphvizResponse.java b/src/main/java/beamline/models/responses/GraphvizResponse.java index d91a232a1bca47dd70eac9c7ad8eaf857dfd8262..2dcef1ef42b3106c20f39d19a2e1f88e992fda58 100644 --- a/src/main/java/beamline/models/responses/GraphvizResponse.java +++ b/src/main/java/beamline/models/responses/GraphvizResponse.java @@ -8,7 +8,9 @@ import beamline.graphviz.Dot; * * @author Andrea Burattin */ -public interface GraphvizResponse extends Response { +public abstract class GraphvizResponse extends Response { + + private static final long serialVersionUID = 7232727657074096321L; /** * Returns the Dot representation of the response diff --git a/src/main/java/beamline/models/responses/Response.java b/src/main/java/beamline/models/responses/Response.java index c728c69cb7ca04379efe7af671c19bac0616301d..256972c6bccf772cca42fa67197872c44df0cce2 100644 --- a/src/main/java/beamline/models/responses/Response.java +++ b/src/main/java/beamline/models/responses/Response.java @@ -1,10 +1,24 @@ package beamline.models.responses; +import java.io.Serializable; + /** * Marker interface used to define the type of the responses * * @author Andrea Burattin */ -public interface Response { +public class Response implements Serializable { + + private static final long serialVersionUID = 3104314256198741100L; + private Long processedEvents; + + public Long getProcessedEvents() { + return processedEvents; + } + public void setProcessedEvents(Long processedEvents) { + this.processedEvents = processedEvents; + } + + } diff --git a/src/main/java/beamline/models/responses/StringResponse.java b/src/main/java/beamline/models/responses/StringResponse.java new file mode 100644 index 0000000000000000000000000000000000000000..1f27d853d665daaa4a90b48a0a1c338f9cb8cac2 --- /dev/null +++ b/src/main/java/beamline/models/responses/StringResponse.java @@ -0,0 +1,24 @@ +package beamline.models.responses; + +public class StringResponse extends Response { + + private static final long serialVersionUID = 7863665787098981704L; + private String str; + + public StringResponse(String str) { + set(str); + } + + public String get() { + return str; + } + + public void set(String str) { + this.str = str; + } + + @Override + public String toString() { + return str; + } +} diff --git a/src/main/java/beamline/sources/BeamlineAbstractSource.java b/src/main/java/beamline/sources/BeamlineAbstractSource.java new file mode 100644 index 0000000000000000000000000000000000000000..ed39cb4172298b3607870ea3823c51fe639e8f1d --- /dev/null +++ b/src/main/java/beamline/sources/BeamlineAbstractSource.java @@ -0,0 +1,31 @@ +package beamline.sources; + +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import beamline.events.BEvent; + +/** + * This interface is supposed just to bind the type of {@link SourceFunction} to + * {@link BEvent}. + * + * @author Andrea Burattin + */ +public abstract class BeamlineAbstractSource extends RichSourceFunction<BEvent> { + + private static final long serialVersionUID = 1072198158533070679L; + private boolean running = true; + + /** + * + * @return + */ + public boolean isRunning() { + return running; + } + + @Override + public void cancel() { + this.running = false; + } +} \ No newline at end of file diff --git a/src/main/java/beamline/sources/CSVLogSource.java b/src/main/java/beamline/sources/CSVLogSource.java index 9083f5fe3ebded6f5336dab68440bb33e899e8bb..b97d3bb6086f4ae0fd3678514c96528ee1e0af56 100644 --- a/src/main/java/beamline/sources/CSVLogSource.java +++ b/src/main/java/beamline/sources/CSVLogSource.java @@ -2,63 +2,53 @@ package beamline.sources; import java.io.IOException; import java.io.Reader; +import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; import org.apache.commons.lang3.tuple.Pair; -import org.deckfour.xes.model.XTrace; import com.opencsv.CSVParser; +import com.opencsv.CSVParserBuilder; import com.opencsv.CSVReader; import com.opencsv.CSVReaderBuilder; +import com.opencsv.ICSVParser; +import beamline.events.BEvent; import beamline.exceptions.SourceException; -import beamline.utils.EventUtils; -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.core.ObservableEmitter; -import io.reactivex.rxjava3.core.ObservableOnSubscribe; /** - * This implementation of a {@link XesSource} produces events according to the - * events contained in a CSV file. This source produces a cold observable. + * This implementation of a {@link BeamlineAbstractSource} produces events + * according to the events contained in a CSV file. * * @author Andrea Burattin */ -public class CSVLogSource implements XesSource { +public class CSVLogSource extends BeamlineAbstractSource { - private CSVReader csvReader; + private static final long serialVersionUID = 205574514393782145L; + private CSVLogSource.ParserConfiguration parserConfiguration; private String filename; private int caseIdColumn; private int activityNameColumn; - private CSVParser parser; /** * Constructs the source by providing a CSV parser. * - * <p> - * A parser can be produced, for example with the following code: - * <pre> - * CSVParser parser = new CSVParserBuilder() - * .withSeparator(',') - * .withIgnoreQuotations(true) - * .build(); - * </pre> - * * @param filename the absolute path of the CSV file * @param caseIdColumn the id of the column containing the case id (counting * starts from 0) * @param activityNameColumn the id of the column containing the activity * name (counting starts from 0) - * @param parser the parser to be used for parsing the CSV file + * @param parserConfiguration the parser configuration to be used for + * parsing the CSV file */ - public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn, CSVParser parser) { + public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn, CSVLogSource.ParserConfiguration parserConfiguration) { this.filename = filename; this.caseIdColumn = caseIdColumn; this.activityNameColumn = activityNameColumn; - this.parser = parser; + this.parserConfiguration = parserConfiguration; } /** @@ -71,40 +61,58 @@ public class CSVLogSource implements XesSource { * name (counting starts from 0) */ public CSVLogSource(String filename, int caseIdColumn, int activityNameColumn) { - this(filename, caseIdColumn, activityNameColumn, null); + this(filename, caseIdColumn, activityNameColumn, new CSVLogSource.ParserConfiguration()); } - - @Override - public Observable<XTrace> getObservable() { - return Observable.create(new ObservableOnSubscribe<XTrace>() { - @Override - public void subscribe(@NonNull ObservableEmitter<@NonNull XTrace> emitter) throws Throwable { - String[] line; - while ((line = csvReader.readNext()) != null) { - List<Pair<String, String>> attributes = new LinkedList<>(); - for (int i = 0; i < line.length; i++) { - attributes.add(Pair.of("attribute_" + i, line[i])); - } - emitter.onNext(EventUtils.create(line[activityNameColumn], line[caseIdColumn], null, attributes)); - } - emitter.onComplete(); - } - }); - } - + @Override - public void prepare() throws SourceException { - Reader reader; + public void run(SourceContext<BEvent> ctx) throws Exception { + Reader reader = null; + CSVReader csvReader = null; try { + CSVParser parser = new CSVParserBuilder() + .withSeparator(parserConfiguration.separator) + .build(); reader = Files.newBufferedReader(Paths.get(filename)); + csvReader = new CSVReaderBuilder(reader) + .withCSVParser(parser) + .build(); + + String[] line; + while ((line = csvReader.readNext()) != null && isRunning()) { + List<Pair<String, String>> attributes = new LinkedList<>(); + for (int i = 0; i < line.length; i++) { + attributes.add(Pair.of("attribute_" + i, line[i])); + } + synchronized (ctx.getCheckpointLock()) { + ctx.collect(BEvent.create(filename, line[activityNameColumn], line[caseIdColumn], null, attributes)); + } + } } catch (IOException e) { throw new SourceException(e.getMessage()); + } finally { + if (csvReader != null) { + csvReader.close(); + } } - if (parser == null) { - csvReader = new CSVReader(reader); - } else { - csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build(); + } + + /** + * + * @author Andrea Burattin + */ + public static class ParserConfiguration implements Serializable { + + private static final long serialVersionUID = 375203248074405954L; + char separator = ICSVParser.DEFAULT_SEPARATOR; + + /** + * + * @param separator + * @return + */ + public ParserConfiguration withSeparator(char separator) { + this.separator = separator; + return this; } } - } diff --git a/src/main/java/beamline/sources/MQTTXesSource.java b/src/main/java/beamline/sources/MQTTXesSource.java index 40ee221fd8d2cc656a0e696cefbc161e824068e6..ef820e1f88745e31423e08bfe8705c7af84940e8 100644 --- a/src/main/java/beamline/sources/MQTTXesSource.java +++ b/src/main/java/beamline/sources/MQTTXesSource.java @@ -1,8 +1,9 @@ package beamline.sources; +import java.util.LinkedList; +import java.util.Queue; import java.util.UUID; -import org.deckfour.xes.model.XTrace; import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -11,14 +12,12 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import beamline.events.BEvent; import beamline.exceptions.SourceException; -import beamline.utils.EventUtils; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.subjects.PublishSubject; /** - * This implementation of a {@link XesSource} produces events as they are - * observed in an MQTT-XES broker. This source produces a hot observable. + * This implementation of a {@link BeamlineAbstractSource} produces events as + * they are observed in an MQTT-XES broker. * * <p> * Example of usage: @@ -32,12 +31,12 @@ import io.reactivex.rxjava3.subjects.PublishSubject; * * @author Andrea Burattin */ -public class MQTTXesSource implements XesSource { +public class MQTTXesSource extends BeamlineAbstractSource { + private static final long serialVersionUID = 7849358403852399322L; private String processName; private String brokerHost; private String topicBase; - private PublishSubject<XTrace> ps; /** * Constructs the source @@ -50,21 +49,16 @@ public class MQTTXesSource implements XesSource { this.brokerHost = brokerHost; this.topicBase = topicBase; this.processName = processName; - this.ps = PublishSubject.create(); } @Override - public Observable<XTrace> getObservable() { - return ps; - } - - @Override - public void prepare() throws SourceException { + public void run(SourceContext<BEvent> ctx) throws Exception { + Queue<BEvent> buffer = new LinkedList<>(); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setKeepAliveInterval(30); - - IMqttClient myClient; + + IMqttClient myClient = null; try { myClient = new MqttClient(brokerHost, UUID.randomUUID().toString()); myClient.setCallback(new MqttCallback() { @@ -75,7 +69,8 @@ public class MQTTXesSource implements XesSource { String partBeforeActName = topic.substring(0, posLastSlash); String activityName = topic.substring(posLastSlash + 1); String caseId = partBeforeActName.substring(partBeforeActName.lastIndexOf("/") + 1); - ps.onNext(EventUtils.create(activityName, caseId)); + BEvent b = BEvent.create(processName, activityName, caseId); + buffer.add(b); } @Override @@ -93,6 +88,25 @@ public class MQTTXesSource implements XesSource { } catch (MqttException e) { throw new SourceException(e.getMessage()); } + + while(isRunning()) { + while (isRunning() && buffer.isEmpty()) { + Thread.sleep(100l); + } + if (isRunning()) { + synchronized (ctx.getCheckpointLock()) { + BEvent e = buffer.poll(); + ctx.collect(e); + } + } + } + + if (!isRunning() && myClient.isConnected()) { + try { + myClient.disconnect(); + } catch (MqttException e) { + // nothing to do here + } + } } - } diff --git a/src/main/java/beamline/sources/Source.java b/src/main/java/beamline/sources/Source.java deleted file mode 100644 index 5c0772fa3f5b74c035048d0072eecd7b8824008f..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/sources/Source.java +++ /dev/null @@ -1,36 +0,0 @@ -package beamline.sources; - -import beamline.exceptions.SourceException; -import io.reactivex.rxjava3.core.Observable; - -/** - * This interface is the base type that should be extended by all sources to be - * used in the framework. When using a source implementing this type, the method - * {@link #prepare()} should be called <strong>before</strong> - * {@link #getObservable()}. - * - * @author Andrea Burattin - * - * @param <T> the type of observable objects this interface will produce. - */ -public interface Source<T> { - - /** - * This method returns the observable created by the source. Before calling - * this method, it is important to prepare the source by calling the - * {@link #prepare()} method. - * - * @return the {@link Observable} - */ - public Observable<T> getObservable(); - - /** - * This method is supposed to be called before the {@link #getObservable()} - * one: it is in charge of preparing the source to be processed. - * - * @throws SourceException while preparing the source, it is important to be - * aware that some sources may generate specific exceptions (e.g., file not - * found, network problems). - */ - public void prepare() throws SourceException; -} diff --git a/src/main/java/beamline/sources/StringTestSource.java b/src/main/java/beamline/sources/StringTestSource.java new file mode 100644 index 0000000000000000000000000000000000000000..613adb619066291ccf56c9c379344a73f6fe7a23 --- /dev/null +++ b/src/main/java/beamline/sources/StringTestSource.java @@ -0,0 +1,22 @@ +package beamline.sources; + +import beamline.events.BEvent; + +public class StringTestSource extends BeamlineAbstractSource { + + private static final long serialVersionUID = 7657971352128040279L; + private String[] traces; + + public StringTestSource(String...traces) { + this.traces = traces; + } + + @Override + public void run(SourceContext<BEvent> ctx) throws Exception { + for (int j = 0; j < traces.length; j++) { + for (int i = 0; i < traces[j].length(); i++) { + ctx.collect(BEvent.create("test-process", traces[j].substring(i, i+1), "case-"+j)); + } + } + } +} diff --git a/src/main/java/beamline/sources/XesLogSource.java b/src/main/java/beamline/sources/XesLogSource.java index 4308ca84e1adb044b2db6bb35f8a73dec263c80d..7af0475830ee40c86fec041f10f22e5ff633c823 100644 --- a/src/main/java/beamline/sources/XesLogSource.java +++ b/src/main/java/beamline/sources/XesLogSource.java @@ -1,44 +1,44 @@ package beamline.sources; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.Collections; -import java.util.Date; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.deckfour.xes.extension.std.XConceptExtension; import org.deckfour.xes.extension.std.XTimeExtension; -import org.deckfour.xes.factory.XFactory; -import org.deckfour.xes.factory.XFactoryNaiveImpl; +import org.deckfour.xes.in.XMxmlGZIPParser; +import org.deckfour.xes.in.XMxmlParser; import org.deckfour.xes.in.XParser; import org.deckfour.xes.in.XesXmlGZIPParser; import org.deckfour.xes.in.XesXmlParser; import org.deckfour.xes.model.XAttribute; -import org.deckfour.xes.model.XAttributeMap; import org.deckfour.xes.model.XEvent; import org.deckfour.xes.model.XLog; import org.deckfour.xes.model.XTrace; +import org.deckfour.xes.out.XesXmlGZIPSerializer; +import beamline.events.BEvent; +import beamline.exceptions.EventException; import beamline.exceptions.SourceException; -import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.core.ObservableEmitter; -import io.reactivex.rxjava3.core.ObservableOnSubscribe; /** - * This implementation of a {@link XesSource} produces events according to the - * events contained in an {@link XLog}. The events are first sorted according to - * their timestamp and then sent. This source produces a cold observable. + * This implementation of a {@link BeamlineAbstractSource} produces events according to + * the events contained in an {@link XLog}. The events are first sorted + * according to their timestamp and then sent. * * @author Andrea Burattin */ -public class XesLogSource implements XesSource { +public class XesLogSource extends BeamlineAbstractSource { + + private static final long serialVersionUID = 1095855454671335981L; - private static XFactory xesFactory = new XFactoryNaiveImpl(); - private String fileName; - private XLog log; - private List<XTrace> events; + private List<BEvent> events; /** * Constructs a source from the path of a log @@ -56,83 +56,97 @@ public class XesLogSource implements XesSource { * Constructs a source from the given log * * @param log the log to use as source + * @throws IOException */ - public XesLogSource(XLog log) { - this.log = log; + public XesLogSource(XLog log) throws IOException { + File tmpFile = File.createTempFile("file", ".xes.gz"); + new XesXmlGZIPSerializer().serialize(log, new FileOutputStream(tmpFile)); + this.fileName = tmpFile.getAbsolutePath(); } @Override - public Observable<XTrace> getObservable() { - return Observable.create(new ObservableOnSubscribe<XTrace>() { - @Override - public void subscribe(@NonNull ObservableEmitter<@NonNull XTrace> emitter) throws Throwable { - for (XTrace wrapper : events) { - emitter.onNext(wrapper); + public void run(SourceContext<BEvent> ctx) throws Exception { + if (events == null) { + prepareStream(parseLog(fileName)); + } + Iterator<BEvent> i = events.iterator(); + while(i.hasNext() && isRunning()) { + BEvent event = i.next(); + if (event.getEventTime() != null) { + synchronized (ctx.getCheckpointLock()) { + ctx.collectWithTimestamp(event, event.getEventTime().getTime()); + } + } else { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(i.next()); } - emitter.onComplete(); } - }); - } - - @Override - public void prepare() throws SourceException { - if (log == null) { - parseLog(fileName); } - prepareStream(); } - private void parseLog(String fileName) throws SourceException { - XParser[] parsers = new XParser[] { new XesXmlGZIPParser(), new XesXmlParser() }; + private XLog parseLog(String fileName) throws SourceException { + XParser[] parsers = new XParser[] { + new XesXmlGZIPParser(), + new XesXmlParser(), + new XMxmlParser(), + new XMxmlGZIPParser() }; File file = new File(fileName); for (XParser p : parsers) { if (p.canParse(file)) { try { - log = p.parse(file).get(0); + return p.parse(file).get(0); } catch (Exception e) { throw new SourceException(e.getMessage()); } - return; } } throw new SourceException("XES file format not supported"); } - private void prepareStream() throws SourceException { + private void prepareStream(XLog log) throws SourceException, EventException { if (log.isEmpty()) { throw new SourceException("The given log is empty"); } + + // construct the process name + String processName = XConceptExtension.instance().extractName(log); + if (processName == null) { + processName = "unnamed-xes-process"; + if (fileName != null) { + processName = fileName; + } + } + // populate all events events = new LinkedList<>(); for (XTrace t : log) { for (XEvent e : t) { - // create the wrapping trace - XTrace eventWrapper = xesFactory.createTrace(); - XAttributeMap am = t.getAttributes(); - for (Map.Entry<String, XAttribute> v : am.entrySet()) { - eventWrapper.getAttributes().put(v.getKey(), v.getValue()); + BEvent be = BEvent.create( + processName, + XConceptExtension.instance().extractName(e), + XConceptExtension.instance().extractName(t), + XTimeExtension.instance().extractTimestamp(e)); + + // log attributes + for (Map.Entry<String, XAttribute> v : log.getAttributes().entrySet()) { + be.setLogAttribute(v.getKey(), v.getValue()); + } + + // trace attributes + for (Map.Entry<String, XAttribute> v : t.getAttributes().entrySet()) { + be.setTraceAttribute(v.getKey(), v.getValue()); } - // create the actual event - XEvent newEvent = xesFactory.createEvent(); - XAttributeMap amEvent = e.getAttributes(); - for (Map.Entry<String, XAttribute> v : amEvent.entrySet()) { - newEvent.getAttributes().put(v.getKey(), v.getValue()); + + // event attributes + for (Map.Entry<String, XAttribute> v : e.getAttributes().entrySet()) { + be.setEventAttribute(v.getKey(), v.getValue()); } - eventWrapper.add(newEvent); - events.add(eventWrapper); + + events.add(be); } } // sort events - Collections.sort(events, (XTrace o1, XTrace o2) -> { - XEvent e1 = o1.get(0); - XEvent e2 = o2.get(0); - Date d1 = XTimeExtension.instance().extractTimestamp(e1); - Date d2 = XTimeExtension.instance().extractTimestamp(e2); - if (d1 == null || d2 == null) { - return 0; - } - return d1.compareTo(d2); - }); + Collections.sort(events); } } diff --git a/src/main/java/beamline/sources/XesSource.java b/src/main/java/beamline/sources/XesSource.java deleted file mode 100644 index 6ba5c65695640cb91e8b9b6f1fb01649ade1ba36..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/sources/XesSource.java +++ /dev/null @@ -1,13 +0,0 @@ -package beamline.sources; - -import org.deckfour.xes.model.XTrace; - -/** - * This interface is supposed just to bind the type of {@link Source} to - * {@link XTrace}. - * - * @author Andrea Burattin - */ -public interface XesSource extends Source<XTrace> { - -} \ No newline at end of file diff --git a/src/main/java/beamline/utils/EventUtils.java b/src/main/java/beamline/utils/EventUtils.java deleted file mode 100644 index 1e3037f2299b3ddb9de48277755fd5cb5d9e153f..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/utils/EventUtils.java +++ /dev/null @@ -1,109 +0,0 @@ -package beamline.utils; - -import java.util.Collection; -import java.util.Date; - -import org.apache.commons.lang3.tuple.Pair; -import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.extension.std.XTimeExtension; -import org.deckfour.xes.factory.XFactory; -import org.deckfour.xes.factory.XFactoryNaiveImpl; -import org.deckfour.xes.model.XEvent; -import org.deckfour.xes.model.XTrace; -import org.deckfour.xes.model.impl.XAttributeLiteralImpl; - -import beamline.exceptions.EventException; - -/** - * This class contains some utility methods useful to handle and process events. - * - * @author Andrea Burattin - */ -public class EventUtils { - - private static final XFactory xesFactory = new XFactoryNaiveImpl(); - - private EventUtils() { } - - /** - * Creates a new {@link XTrace} referring to one event - * - * @param activityName the name of the activity - * @param caseId the identifier of the process instance - * @param time the time when the event has happened - * @param eventAttributes a collection of string attributes for the event - * @return the new event - * @throws EventException this exception is thrown is incomplete information - * is provided - */ - public static XTrace create(String activityName, String caseId, Date time, Collection<Pair<String, String>> eventAttributes) throws EventException { - if (activityName == null || caseId == null) { - throw new EventException("Activity name or case id missing"); - } - - XEvent event = xesFactory.createEvent(); - XConceptExtension.instance().assignName(event, activityName); - if (time == null) { - XTimeExtension.instance().assignTimestamp(event, new Date()); - } else { - XTimeExtension.instance().assignTimestamp(event, time); - } - if (eventAttributes != null) { - for(Pair<String, String> a : eventAttributes) { - event.getAttributes().put(a.getLeft(), new XAttributeLiteralImpl(a.getLeft(), a.getRight())); - } - } - XTrace eventWrapper = xesFactory.createTrace(); - XConceptExtension.instance().assignName(eventWrapper, caseId); - eventWrapper.add(event); - return eventWrapper; - } - - /** - * Creates a new {@link XTrace} referring to one event - * - * @param activityName the name of the activity - * @param caseId the identifier of the process instance - * @param time the time when the event has happened - * @return the new event - * @throws EventException this exception is thrown is incomplete information - * is provided - */ - public static XTrace create(String activityName, String caseId, Date time) throws EventException { - return create(activityName, caseId, time, null); - } - - /** - * Creates a new {@link XTrace} referring to one event. The time of the - * event is set to the current time - * - * @param activityName the name of the activity - * @param caseId the identifier of the process instance - * @return the new event - * @throws EventException this exception is thrown is incomplete information - * is provided - */ - public static XTrace create(String activityName, String caseId) throws EventException { - return create(activityName, caseId, null, null); - } - - /** - * Extracts the activity name - * - * @param event the event - * @return the activity name - */ - public static String getActivityName(XTrace event) { - return XConceptExtension.instance().extractName(event.get(0)); - } - - /** - * Extracts the case id - * - * @param event the event - * @return the case id - */ - public static String getCaseId(XTrace event) { - return XConceptExtension.instance().extractName(event); - } -} diff --git a/src/main/java/beamline/utils/package-info.java b/src/main/java/beamline/utils/package-info.java deleted file mode 100644 index 6db5f83602ebb6e9bd2de0a302b9831a3b7817c7..0000000000000000000000000000000000000000 --- a/src/main/java/beamline/utils/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * This package contains utility classes to be used throughout the framework. - */ -package beamline.utils; \ No newline at end of file diff --git a/src/test/java/beamline/tests/AlgorithmTest.java b/src/test/java/beamline/tests/AlgorithmTest.java index 4ce3dee9c83f4ccc594f93170ce00520d5c23fa7..53c284315e64d3ff024c538712fb9a79f42019c2 100644 --- a/src/test/java/beamline/tests/AlgorithmTest.java +++ b/src/test/java/beamline/tests/AlgorithmTest.java @@ -4,62 +4,58 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasItems; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; +import beamline.events.BEvent; import beamline.models.algorithms.StreamMiningAlgorithm; -import io.reactivex.rxjava3.core.Observable; +import beamline.models.responses.StringResponse; public class AlgorithmTest { - Observable<Integer> o = Observable.just(3, 7, 11, 13); - @Test - public void test_result() { - StreamMiningAlgorithm<Integer, Integer> m = new StreamMiningAlgorithm<Integer, Integer>() { - public Integer product = 1; - + public void test_result_1() throws Exception { + StreamMiningAlgorithm<StringResponse> m = new StreamMiningAlgorithm<StringResponse>() { + private static final long serialVersionUID = -8445717838576941924L; + @Override - public Integer ingest(Integer event) { - product *= event; - setLatestResponse(-product); - return product; + public StringResponse ingest(BEvent event) throws Exception { + return new StringResponse(event.getProcessName() + event.getEventName() + event.getTraceName()); } }; - o.subscribe(m); - assertEquals(4, m.getProcessedEvents()); - assertEquals(3003, m.getLatestResponse()); + List<String> events = new LinkedList<>(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .fromElements( + BEvent.create("p", "3", "c1"), + BEvent.create("p", "7", "c1"), + BEvent.create("p", "11", "c1"), + BEvent.create("p", "13", "c1")) + .keyBy(BEvent::getProcessName) + .flatMap(m) + .executeAndCollect().forEachRemaining((StringResponse e) -> { + events.add(e.get()); + }); + + assertEquals(4, events.size()); + assertThat(events, hasItems("p3c1","p7c1","p11c1","p13c1")); } @Test - public void test_hooks() { - StreamMiningAlgorithm<Integer, Integer> m = new StreamMiningAlgorithm<Integer, Integer>() { - public Integer product = 1; - + public void test_result_2() throws Exception { + StreamMiningAlgorithm<StringResponse> m = new StreamMiningAlgorithm<StringResponse>() { + private static final long serialVersionUID = -8445717838576941924L; + @Override - public Integer ingest(Integer event) { - product *= event; - setLatestResponse(-product); - return product; + public StringResponse ingest(BEvent event) throws Exception { + return new StringResponse(event.getProcessName() + event.getEventName() + event.getTraceName()); } }; - List<Integer> resultsBefore = new ArrayList<Integer>(); - m.setOnBeforeEvent(() -> { - resultsBefore.add(m.getProcessedEvents()); - }); - - List<Integer> resultsAfter = new ArrayList<Integer>(); - m.setOnAfterEvent(() -> { - resultsAfter.add(m.getProcessedEvents()); - }); - - o.subscribe(m); - - assertThat(resultsBefore, hasItems(0,1,2,3)); - assertThat(resultsAfter, hasItems(1,2,3,4)); + assertEquals(-1, m.getProcessedEvents()); } } diff --git a/src/test/java/beamline/tests/FiltersTest.java b/src/test/java/beamline/tests/FiltersTest.java index c8fa5c09a1e43aee52b33201533672896e28dbb9..591cc3f15e6f610a00584ddb161067d7cceb2c4a 100644 --- a/src/test/java/beamline/tests/FiltersTest.java +++ b/src/test/java/beamline/tests/FiltersTest.java @@ -1,90 +1,61 @@ package beamline.tests; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasItems; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.List; - -import org.deckfour.xes.model.impl.XAttributeLiteralImpl; import org.junit.jupiter.api.Test; +import beamline.events.BEvent; +import beamline.exceptions.EventException; import beamline.filters.ExcludeActivitiesFilter; import beamline.filters.ExcludeOnCaseAttributeEqualityFilter; import beamline.filters.RetainActivitiesFilter; import beamline.filters.RetainOnCaseAttributeEqualityFilter; -import beamline.utils.EventUtils; public class FiltersTest { @Test - public void test_exclude_activities_on_name_filter() { - List<String> results = new ArrayList<String>(); - Utils.generateObservableSameCaseId() - .filter(new ExcludeActivitiesFilter("A")) - .subscribe((t) -> results.add(EventUtils.getActivityName(t))); - assertEquals(3, results.size()); - assertThat(results, hasItems("K","B","C")); - } - - @Test - public void test_retain_activities_on_name_filter() { - List<String> results = new ArrayList<String>(); - Utils.generateObservableSameCaseId() - .filter(new RetainActivitiesFilter("A","B")) - .subscribe((t) -> results.add(EventUtils.getActivityName(t))); - assertEquals(3, results.size()); - assertThat(results, hasItems("A","B","A")); + public void test_exclude_activities_on_name_filter() throws EventException, Exception { + ExcludeActivitiesFilter f = new ExcludeActivitiesFilter("A"); + + assertTrue(f.filter(BEvent.create("", "B", ""))); + assertFalse(f.filter(BEvent.create("", "A", ""))); } @Test - public void test_retain_activities_on_case_attribute_filter_1() { - List<String> results = new ArrayList<String>(); - Utils.generateObservableSameCaseId() - .filter(new RetainOnCaseAttributeEqualityFilter<XAttributeLiteralImpl>( - "a1", - new XAttributeLiteralImpl("a1", "v1"))) - .subscribe((t) -> results.add(EventUtils.getActivityName(t))); - assertEquals(1, results.size()); - assertThat(results, hasItems("A")); + public void test_retain_activities_on_name_filter() throws EventException { + RetainActivitiesFilter f = new RetainActivitiesFilter("A"); + + assertTrue(f.filter(BEvent.create("", "A", ""))); + assertFalse(f.filter(BEvent.create("", "B", ""))); } @Test - public void test_retain_activities_on_case_attribute_filter_2() { - List<String> results = new ArrayList<String>(); - Utils.generateObservableSameCaseId() - .filter(new RetainOnCaseAttributeEqualityFilter<XAttributeLiteralImpl>( - "a1", - new XAttributeLiteralImpl("a1", "v1"), - new XAttributeLiteralImpl("a1", "v4"))) - .subscribe((t) -> results.add(EventUtils.getActivityName(t))); - assertEquals(2, results.size()); - assertThat(results, hasItems("A","C")); + public void test_retain_activities_on_case_attribute_filter() throws EventException { + BEvent e1 = BEvent.create("", "", ""); + BEvent e2 = BEvent.create("", "", ""); + + e1.getTraceAttributes().put("a", "v1"); + e2.getTraceAttributes().put("a", "v2"); + + RetainOnCaseAttributeEqualityFilter<String> f = new RetainOnCaseAttributeEqualityFilter<String>("a", "v1"); + + assertTrue(f.filter(e1)); + assertFalse(f.filter(e2)); } @Test - public void test_exclude_activities_on_case_attribute_filter_1() { - List<String> results = new ArrayList<String>(); - Utils.generateObservableSameCaseId() - .filter(new ExcludeOnCaseAttributeEqualityFilter<XAttributeLiteralImpl>( - "a1", - new XAttributeLiteralImpl("a1", "v1"))) - .subscribe((t) -> results.add(EventUtils.getActivityName(t))); - assertEquals(4, results.size()); - assertThat(results, hasItems("K","B","A","C")); + public void test_exclude_activities_on_case_attribute_filter() throws EventException { + BEvent e1 = BEvent.create("", "", ""); + BEvent e2 = BEvent.create("", "", ""); + + e1.getTraceAttributes().put("a", "v1"); + e2.getTraceAttributes().put("a", "v2"); + + ExcludeOnCaseAttributeEqualityFilter<String> f = new ExcludeOnCaseAttributeEqualityFilter<String>("a", "v1"); + + assertFalse(f.filter(e1)); + assertTrue(f.filter(e2)); } - @Test - public void test_exclude_activities_on_case_attribute_filter_2() { - List<String> results = new ArrayList<String>(); - Utils.generateObservableSameCaseId() - .filter(new ExcludeOnCaseAttributeEqualityFilter<XAttributeLiteralImpl>( - "a1", - new XAttributeLiteralImpl("a1", "v1"), - new XAttributeLiteralImpl("a1", "v4"))) - .subscribe((t) -> results.add(EventUtils.getActivityName(t))); - assertEquals(3, results.size()); - assertThat(results, hasItems("K","B","A")); - } } diff --git a/src/test/java/beamline/tests/MapperTest.java b/src/test/java/beamline/tests/MapperTest.java index 59d758e0ba19e428f43f2f3c3adab34162daefb4..f4dc59165ea1693d26b013e6f18a6258ec3bc8b0 100644 --- a/src/test/java/beamline/tests/MapperTest.java +++ b/src/test/java/beamline/tests/MapperTest.java @@ -6,25 +6,43 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.List; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; -import beamline.mappers.DirectlyFollowsRelation; -import beamline.mappers.InfiniteSizeDirectlyFollowsMapper; +import beamline.events.BEvent; +import beamline.exceptions.EventException; +import beamline.models.algorithms.InfiniteSizeDirectlyFollowsMapper; +import beamline.models.responses.DirectlyFollowsRelation; public class MapperTest { @Test - public void test_infinite_size_df() { + public void test_infinite_size_df() throws EventException, Exception { List<DirectlyFollowsRelation> results = new ArrayList<>(); - // <K,A,B,A,C> - Utils.generateObservableSameCaseId() + // <K,A,B,A,C>, <A,B,A> + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .fromElements( + BEvent.create("p", "K", "c"), + BEvent.create("p", "A", "c2"), + BEvent.create("p", "A", "c"), + BEvent.create("p", "B", "c"), + BEvent.create("p", "B", "c2"), + BEvent.create("p", "A", "c"), + BEvent.create("p", "A", "c2"), + BEvent.create("p", "C", "c")) + .keyBy(BEvent::getProcessName) .flatMap(new InfiniteSizeDirectlyFollowsMapper()) - .subscribe((df) -> results.add(df)); + .executeAndCollect().forEachRemaining((DirectlyFollowsRelation e) -> { + results.add(e); + }); - assertEquals(4, results.size()); + assertEquals(6, results.size()); assertTrue(Utils.verifyDirectFollows(results.get(0), "K", "A", "c")); assertTrue(Utils.verifyDirectFollows(results.get(1), "A", "B", "c")); - assertTrue(Utils.verifyDirectFollows(results.get(2), "B", "A", "c")); - assertTrue(Utils.verifyDirectFollows(results.get(3), "A", "C", "c")); + assertTrue(Utils.verifyDirectFollows(results.get(2), "A", "B", "c2")); + assertTrue(Utils.verifyDirectFollows(results.get(3), "B", "A", "c")); + assertTrue(Utils.verifyDirectFollows(results.get(4), "B", "A", "c2")); + assertTrue(Utils.verifyDirectFollows(results.get(5), "A", "C", "c")); } } diff --git a/src/test/java/beamline/tests/SourcesTest.java b/src/test/java/beamline/tests/SourcesTest.java index 3fff9050f9f0d2129aff24f6f8e0faf231e78bad..95d18d441df00775183484ae24bd01ee20c6a0aa 100644 --- a/src/test/java/beamline/tests/SourcesTest.java +++ b/src/test/java/beamline/tests/SourcesTest.java @@ -2,45 +2,62 @@ package beamline.tests; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasItems; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import java.io.File; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.StandardOpenOption; import java.util.LinkedList; import java.util.List; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.Test; -import com.opencsv.CSVParserBuilder; - -import beamline.exceptions.SourceException; +import beamline.events.BEvent; import beamline.sources.CSVLogSource; import beamline.sources.MQTTXesSource; +import beamline.sources.StringTestSource; import beamline.sources.XesLogSource; -import beamline.utils.EventUtils; public class SourcesTest { @Test - public void test_csv_source_1() { + public void test_string_test_source() throws Exception { + List<String> acts = new LinkedList<>(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(new StringTestSource("ABCDA")); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + }); + + assertEquals(5, acts.size()); + + assertThat(acts, hasItems("A","B","C","D","A")); + } + + @Test + public void test_csv_source_1() throws Exception { List<String> acts = new LinkedList<>(); List<String> caseIds = new LinkedList<>(); - CSVLogSource s = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1); - try { - s.prepare(); - } catch (SourceException e) { - e.printStackTrace(); - } - s.getObservable().subscribe((t) -> { - acts.add(EventUtils.getActivityName(t)); - caseIds.add(EventUtils.getCaseId(t)); + CSVLogSource source = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(source); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); }); assertEquals(5, acts.size()); @@ -51,22 +68,20 @@ public class SourcesTest { } @Test - public void test_csv_source_2() { + public void test_csv_source_2() throws Exception { List<String> acts = new LinkedList<>(); List<String> caseIds = new LinkedList<>(); - CSVLogSource s = new CSVLogSource( + CSVLogSource source = new CSVLogSource( "src/test/resources/sources/source_2.csv", 0, 1, - new CSVParserBuilder().withSeparator('|').build()); - try { - s.prepare(); - } catch (SourceException e) { - e.printStackTrace(); - } - s.getObservable().subscribe((t) -> { - acts.add(EventUtils.getActivityName(t)); - caseIds.add(EventUtils.getCaseId(t)); + new CSVLogSource.ParserConfiguration().withSeparator('|')); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(source); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); }); assertEquals(5, acts.size()); @@ -78,35 +93,49 @@ public class SourcesTest { @Test public void test_csv_source_3() { - CSVLogSource s = new CSVLogSource("DOESNT_EXIST", 0, 1); - assertThrowsExactly(SourceException.class, () -> s.prepare()); + CSVLogSource source = new CSVLogSource("DOESNT_EXIST", 0, 1); + assertThrowsExactly(JobExecutionException.class, () -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(source).map(e -> e).print(); + env.execute(); + }); } @Test public void test_xes_source_1() { XesLogSource s1 = new XesLogSource("src/test/resources/sources/empty.xes"); - assertThrowsExactly(SourceException.class, () -> s1.prepare()); + assertThrowsExactly(JobExecutionException.class, () -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(s1).map(e -> e).print(); + env.execute(); + }); XesLogSource s2 = new XesLogSource("src/test/resources/sources/empty_2.xes"); - assertThrowsExactly(SourceException.class, () -> s2.prepare()); + assertThrowsExactly(JobExecutionException.class, () -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(s2).map(e -> e).print(); + env.execute(); + }); XesLogSource s3 = new XesLogSource("src/test/resources/sources/empty.csv"); - assertThrowsExactly(SourceException.class, () -> s3.prepare()); + assertThrowsExactly(JobExecutionException.class, () -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(s3).map(e -> e).print(); + env.execute(); + }); } @Test - public void test_xes_source_2() { + public void test_xes_source_2() throws Exception { List<String> acts = new LinkedList<>(); List<String> caseIds = new LinkedList<>(); - XesLogSource s = new XesLogSource(Utils.generteXLog()); - try { - s.prepare(); - } catch (SourceException e) { - e.printStackTrace(); - } - s.getObservable().subscribe((t) -> { - acts.add(EventUtils.getActivityName(t)); - caseIds.add(EventUtils.getCaseId(t)); + XesLogSource source = new XesLogSource(Utils.generteXLog()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(source); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); }); assertEquals(9, acts.size()); @@ -117,18 +146,36 @@ public class SourcesTest { } @Test - public void test_xes_source_3() { + public void test_xes_source_3() throws Exception { List<String> acts = new LinkedList<>(); List<String> caseIds = new LinkedList<>(); XesLogSource s = new XesLogSource("src/test/resources/sources/source.xes.gz"); - try { - s.prepare(); - } catch (SourceException e) { - e.printStackTrace(); - } - s.getObservable().subscribe((t) -> { - acts.add(EventUtils.getActivityName(t)); - caseIds.add(EventUtils.getCaseId(t)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(s); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); + }); + + assertEquals(5, acts.size()); + assertEquals(5, caseIds.size()); + + assertThat(acts, hasItems("a11","a21","a22","a12","a23")); + assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); + } + + @Test + public void test_xes_source_4() throws Exception { + List<String> acts = new LinkedList<>(); + List<String> caseIds = new LinkedList<>(); + XesLogSource s = new XesLogSource("src/test/resources/sources/source_2.xes"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<BEvent> stream = env.addSource(s); + stream.executeAndCollect().forEachRemaining((BEvent e) -> { + acts.add(e.getEventName()); + caseIds.add(e.getTraceName()); }); assertEquals(5, acts.size()); @@ -146,42 +193,52 @@ public class SourcesTest { brokerService.start(); brokerService.waitUntilStarted(); - List<String> acts = new LinkedList<>(); - List<String> caseIds = new LinkedList<>(); + MQTTXesSource s = new MQTTXesSource("tcp://localhost:9999", "test", "name"); + + // create the sink file + final File tmpFile = File.createTempFile("mqtt", "log"); // create actual source - MQTTXesSource source = new MQTTXesSource("tcp://localhost:9999", "test", "name"); - source.prepare(); - source.getObservable().subscribe((t) -> { - acts.add(EventUtils.getActivityName(t)); - caseIds.add(EventUtils.getCaseId(t)); - }); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .addSource(s) + .keyBy(BEvent::getProcessName) + .addSink(new RichSinkFunction<BEvent>() { + private static final long serialVersionUID = -8658786866403985570L; + @Override + public void invoke(BEvent value, Context context) throws Exception { + String toWrite = value.getProcessName() + "-" + value.getTraceName() + "-" + value.getEventName() + "/"; + java.nio.file.Files.write(tmpFile.toPath(), toWrite.getBytes(), StandardOpenOption.APPEND); + } + }); +// JobClient job = env.executeAsync(); + env.executeAsync(); + + Thread.sleep(2000); + + System.out.println("going"); MqttClient client = new MqttClient("tcp://localhost:9999", "clientid", new MemoryPersistence()); client.connect(); - publish(client, "c1", "a11"); publish(client, "c2", "a21"); publish(client, "c2", "a22"); publish(client, "c1", "a12"); publish(client, "c2", "a23"); - Thread.sleep(100); - - assertThat(acts, hasItems("a11","a21","a22","a12","a23")); - assertThat(caseIds, hasItems("c1","c2","c2","c1","c2")); - + Thread.sleep(2000); +// job.cancel(); + System.out.println("Done"); + assertEquals( + "name-c1-a11/name-c2-a21/name-c2-a22/name-c1-a12/name-c2-a23/", + org.apache.commons.io.FileUtils.readFileToString(tmpFile, "utf-8")); + } catch (Exception e) { e.printStackTrace(); + assertTrue(false); // error } } - @Test - public void test_mqtt_2() { - MQTTXesSource source = new MQTTXesSource("tcp://localhost:1", "test", "name"); - assertThrowsExactly(SourceException.class, () -> source.prepare()); - } - protected void publish(MqttClient client, String caseId, String activityName) throws MqttPersistenceException, MqttException { client.publish("test/name/" + caseId + "/" + activityName, "{}".getBytes(StandardCharsets.UTF_8), 1, false); } diff --git a/src/test/java/beamline/tests/Utils.java b/src/test/java/beamline/tests/Utils.java index 123f1eab1220ff5644b2e4381c2b58b8ac04e61c..e72b50574eefc8c1dc13c74ac7c8a1f2d73dcc4d 100644 --- a/src/test/java/beamline/tests/Utils.java +++ b/src/test/java/beamline/tests/Utils.java @@ -6,12 +6,8 @@ import org.deckfour.xes.factory.XFactory; import org.deckfour.xes.factory.XFactoryNaiveImpl; import org.deckfour.xes.model.XLog; import org.deckfour.xes.model.XTrace; -import org.deckfour.xes.model.impl.XAttributeLiteralImpl; -import beamline.exceptions.EventException; -import beamline.mappers.DirectlyFollowsRelation; -import beamline.utils.EventUtils; -import io.reactivex.rxjava3.core.Observable; +import beamline.models.responses.DirectlyFollowsRelation; public class Utils { @@ -25,25 +21,25 @@ public class Utils { * - A * - C / trace attribute: (a1,v4) */ - public static Observable<XTrace> generateObservableSameCaseId() { - XTrace[] events = null; - try { - events = new XTrace[] { - EventUtils.create("K", "c"), - EventUtils.create("A", "c"), - EventUtils.create("B", "c"), - EventUtils.create("A", "c"), - EventUtils.create("C", "c") - }; - } catch (EventException e) { - e.printStackTrace(); - } - events[1].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v1")); - events[2].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v3")); - events[3].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v2")); - events[4].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v4")); - return Observable.fromArray(events); - } +// public static Observable<XTrace> generateObservableSameCaseId() { +// XTrace[] events = null; +// try { +// events = new XTrace[] { +// EventUtils.create("K", "c"), +// EventUtils.create("A", "c"), +// EventUtils.create("B", "c"), +// EventUtils.create("A", "c"), +// EventUtils.create("C", "c") +// }; +// } catch (EventException e) { +// e.printStackTrace(); +// } +// events[1].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v1")); +// events[2].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v3")); +// events[3].get(0).getAttributes().put("a2", new XAttributeLiteralImpl("a2", "v2")); +// events[4].getAttributes().put("a1", new XAttributeLiteralImpl("a1", "v4")); +// return Observable.fromArray(events); +// } /* * c1: <K,A,B,A,C> @@ -78,8 +74,8 @@ public class Utils { } public static boolean verifyDirectFollows(DirectlyFollowsRelation df, String a1, String a2, String caseId) { - String df_a1 = XConceptExtension.instance().extractName(df.getFirst()); - String df_a2 = XConceptExtension.instance().extractName(df.getSecond()); + String df_a1 = df.getFrom().getEventName(); + String df_a2 = df.getTo().getEventName(); return df_a1.equals(a1) && df_a2.equals(a2) && df.getCaseId().equals(caseId); } } diff --git a/src/test/java/beamline/tests/UtilsTest.java b/src/test/java/beamline/tests/UtilsTest.java deleted file mode 100644 index a7fe3dce39daea414861a78254176d4f3dedfa62..0000000000000000000000000000000000000000 --- a/src/test/java/beamline/tests/UtilsTest.java +++ /dev/null @@ -1,102 +0,0 @@ -package beamline.tests; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.sql.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.commons.lang3.tuple.Pair; -import org.deckfour.xes.extension.std.XConceptExtension; -import org.deckfour.xes.extension.std.XTimeExtension; -import org.deckfour.xes.model.XTrace; -import org.junit.jupiter.api.Test; - -import beamline.exceptions.EventException; -import beamline.utils.EventUtils; - -public class UtilsTest { - - @Test - public void test_create_event() { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - XTrace t = null; - try { - t = EventUtils.create(activityName, caseId); - } catch (EventException e) { } - - assertNotNull(t); - assertEquals(XConceptExtension.instance().extractName(t), caseId); - assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName); - } - - @Test - public void test_create_event_time() { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - Date date = Date.valueOf("1996-01-23"); - XTrace t = null; - try { - t = EventUtils.create(activityName, caseId, date); - } catch (EventException e) { } - - assertNotNull(t); - assertEquals(XConceptExtension.instance().extractName(t), caseId); - assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName); - assertEquals(XTimeExtension.instance().extractTimestamp(t.get(0)), date); - } - - @Test - public void test_create_event_attributes() { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - Date date = Date.valueOf("1996-01-23"); - List<Pair<String, String>> attributes = new LinkedList<Pair<String, String>>(); - Map<String, String> values = new HashMap<String, String>(); - for (int i = 0; i < 10; i++) { - String attributeName = "attr-" + i; - String attributeValue = UUID.randomUUID().toString(); - values.put(attributeName, attributeValue); - attributes.add(Pair.of(attributeName, attributeValue)); - } - XTrace t = null; - try { - t = EventUtils.create(activityName, caseId, date, attributes); - } catch (EventException e) { } - - assertNotNull(t); - assertEquals(XConceptExtension.instance().extractName(t), caseId); - assertEquals(XConceptExtension.instance().extractName(t.get(0)), activityName); - assertEquals(XTimeExtension.instance().extractTimestamp(t.get(0)), date); - for(String name : t.get(0).getAttributes().keySet()) { - if (name.startsWith("attr-")) { - assertEquals(t.get(0).getAttributes().get(name).toString(), values.get(name)); - } - } - } - - @Test - public void test_no_activityname() { - assertThrows(EventException.class, () -> { - EventUtils.create(null, ""); - }); - assertThrows(EventException.class, () -> { - EventUtils.create("", null); - }); - } - - @Test - public void test_extract_name_case() throws EventException { - String activityName = UUID.randomUUID().toString(); - String caseId = UUID.randomUUID().toString(); - XTrace t = EventUtils.create(activityName, caseId); - assertEquals(activityName, EventUtils.getActivityName(t)); - assertEquals(caseId, EventUtils.getCaseId(t)); - } -} diff --git a/src/test/resources/sources/source_2.xes b/src/test/resources/sources/source_2.xes new file mode 100644 index 0000000000000000000000000000000000000000..7e04dcb19ebbe85098f4736ea503008b37004f34 --- /dev/null +++ b/src/test/resources/sources/source_2.xes @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<log xes.version="1849.2016" xmlns="http://www.xes-standard.org" xes.creator="Fluxicon Disco"> + <extension name="Concept" prefix="concept" uri="http://www.xes-standard.org/concept.xesext"/> + <extension name="Lifecycle" prefix="lifecycle" uri="http://www.xes-standard.org/lifecycle.xesext"/> + <global scope="trace"> + <string key="concept:name" value="name"/> + <string key="variant" value="string"/> + <int key="variant-index" value="0"/> + </global> + <global scope="event"> + <string key="concept:name" value="name"/> + <string key="lifecycle:transition" value="transition"/> + <string key="act" value="string"/> + </global> + <classifier name="Activity" keys="act"/> + <string key="lifecycle:model" value="standard"/> + <string key="creator" value="Fluxicon Disco"/> + <string key="library" value="Fluxicon Octane"/> + <trace> + <string key="concept:name" value="c1"/> + <string key="variant" value="Variant 1"/> + <int key="variant-index" value="1"/> + <string key="creator" value="Fluxicon Disco"/> + <event> + <string key="concept:name" value="a11"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a11"/> + </event> + <event> + <string key="concept:name" value="a12"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a12"/> + </event> + </trace> + <trace> + <string key="concept:name" value="c2"/> + <string key="variant" value="Variant 2"/> + <int key="variant-index" value="2"/> + <string key="creator" value="Fluxicon Disco"/> + <event> + <string key="concept:name" value="a21"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a21"/> + </event> + <event> + <string key="concept:name" value="a22"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a22"/> + </event> + <event> + <string key="concept:name" value="a23"/> + <string key="lifecycle:transition" value="complete"/> + <string key="act" value="a23"/> + </event> + </trace> +</log> \ No newline at end of file