Skip to content
Snippets Groups Projects
Commit caa17251 authored by Andrea Burattin's avatar Andrea Burattin
Browse files

Working on algorithm test

parent c8298f7d
Branches
No related tags found
No related merge requests found
package beamline.models.algorithms;
import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import beamline.events.BEvent;
import beamline.models.responses.Response;
/**
* This abstract class defines the root of the mining algorithms hierarchy. It
......@@ -18,30 +19,22 @@ import beamline.events.BEvent;
*
* @author Andrea Burattin
*/
public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Serializable> {
public abstract class StreamMiningAlgorithm<T extends Response> extends RichFlatMapFunction<BEvent, T> {
private static final long serialVersionUID = 10170817098305999L;
private transient ValueState<Long> processedEvents;
private transient ValueState<Serializable> latestResponse;
private transient HookEventProcessing onBeforeEvent = null;
private transient HookEventProcessing onAfterEvent = null;
@Override
public void open(Configuration parameters) throws Exception {
processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processede vents", Long.class));
latestResponse = getRuntimeContext().getState(new ValueStateDescriptor<>("latest response", Serializable.class));
processedEvents = getRuntimeContext().getState(new ValueStateDescriptor<>("processed-events", Long.class));
}
@Override
public Serializable map(BEvent t) throws Exception {
if (onBeforeEvent != null) {
onBeforeEvent.trigger();
public void flatMap(BEvent event, Collector<T> out) throws Exception {
T latestResponse = process(event);
if (latestResponse != null) {
out.collect(latestResponse);
}
process(t);
if (onAfterEvent != null) {
onAfterEvent.trigger();
}
return getLatestResponse();
}
/**
......@@ -51,8 +44,9 @@ public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Seri
*
* @param event the new event being observed
* @return the result of the mining of the event
* @throws Exception
*/
public abstract Serializable ingest(BEvent event);
public abstract T ingest(BEvent event) throws Exception;
/**
* Returns the total number of events processed so far
......@@ -71,65 +65,25 @@ public abstract class StreamMiningAlgorithm extends RichMapFunction<BEvent, Seri
return -1;
}
/**
* Returns the latest result of the mining
*
* @return the latest result of the mining
*/
public Serializable getLatestResponse() {
try {
return latestResponse.value();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* This method can be used to set a hook to a callback function to be
* executed before an event is processed
*
* @param onBeforeEvent the callback function
*/
public void setOnBeforeEvent(HookEventProcessing onBeforeEvent) {
this.onBeforeEvent = onBeforeEvent;
}
/**
* This method can be used to set a hook to a callback function to be
* executed after an event is processed
*
* @param onAfterEvent the callback function
*/
public void setOnAfterEvent(HookEventProcessing onAfterEvent) {
this.onAfterEvent = onAfterEvent;
}
/*
* The internal processor in charge of updating the internal status of the
* map.
*/
protected void process(BEvent event) {
protected T process(BEvent event) throws Exception {
try {
long value = 1;
if (processedEvents.value() != null) {
value = processedEvents.value() + 1;
}
processedEvents.update(value);
latestResponse.update(ingest(event));
} catch (IOException e) {
e.printStackTrace();
}
T tmp = ingest(event);
if (tmp != null) {
tmp.setProcessedEvents(getProcessedEvents());
}
/*
* Setter of the latest response onto the status.
*/
protected void setLatestResponse(Serializable latestResponse) {
try {
this.latestResponse.update(latestResponse);
} catch (IOException e) {
e.printStackTrace();
}
return tmp;
}
}
......@@ -8,7 +8,9 @@ import beamline.graphviz.Dot;
*
* @author Andrea Burattin
*/
public interface GraphvizResponse extends Response {
public abstract class GraphvizResponse extends Response {
private static final long serialVersionUID = 7232727657074096321L;
/**
* Returns the Dot representation of the response
......
package beamline.models.responses;
import java.io.Serializable;
/**
* Marker interface used to define the type of the responses
*
* @author Andrea Burattin
*/
public interface Response {
public class Response implements Serializable {
private static final long serialVersionUID = 3104314256198741100L;
private Long processedEvents;
public Long getProcessedEvents() {
return processedEvents;
}
public void setProcessedEvents(Long processedEvents) {
this.processedEvents = processedEvents;
}
}
package beamline.models.responses;
public class StringResponse extends Response {
private static final long serialVersionUID = 7863665787098981704L;
private String str;
public StringResponse(String str) {
set(str);
}
public String get() {
return str;
}
public void set(String str) {
this.str = str;
}
@Override
public String toString() {
return str;
}
}
......@@ -6,10 +6,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
......@@ -23,11 +25,13 @@ import org.junit.jupiter.api.Test;
import beamline.events.BEvent;
import beamline.models.algorithms.StreamMiningAlgorithm;
import beamline.models.responses.Response;
import beamline.models.responses.StringResponse;
import beamline.sources.CSVLogSource;
public class AlgorithmTest {
// private OneInputStreamOperatorTestHarness<String, Long> testHarness;
private StreamMiningAlgorithm statefulFlatMapFunction;
// private StreamMiningAlgorithm statefulFlatMapFunction;
// @BeforeEach
// public void setupTestHarness() throws Exception {
......@@ -47,21 +51,62 @@ public class AlgorithmTest {
@Test
public void test_result() throws Exception {
StreamMiningAlgorithm m = new StreamMiningAlgorithm() {
private static final long serialVersionUID = 3268754545347297698L;
// List<String> acts = new LinkedList<>();
// List<String> caseIds = new LinkedList<>();
// CSVLogSource source = new CSVLogSource("src/test/resources/sources/source.csv", 0, 1);
//
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream<BEvent> stream = env.addSource(source);
// stream.executeAndCollect().forEachRemaining((BEvent e) -> {
// acts.add(e.getEventName());
// caseIds.add(e.getTraceName());
// });
//
// assertEquals(5, acts.size());
// assertEquals(5, caseIds.size());
//
// assertThat(acts, hasItems("a11","a21","a22","a12","a23"));
// assertThat(caseIds, hasItems("c1","c2","c2","c1","c2"));
StreamMiningAlgorithm<StringResponse> m = new StreamMiningAlgorithm<StringResponse>() {
@Override
public Serializable ingest(BEvent event) {
int product = 1;
if (getLatestResponse() != null) {
product = (int) getLatestResponse();
}
product *= Integer.parseInt(event.getEventName());
setLatestResponse(-product);
return product;
public StringResponse ingest(BEvent event) throws Exception {
return new StringResponse(event.getProcessName() + event.getEventName() + event.getTraceName());
}
};
List<String> events = new LinkedList<>();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(
BEvent.create("p", "3", "c1"),
BEvent.create("p", "7", "c1"),
BEvent.create("p", "11", "c1"),
BEvent.create("p", "13", "c1"))
.keyBy(BEvent::getProcessName)
.flatMap(m)
.executeAndCollect().forEachRemaining((StringResponse e) -> {
events.add(e.get());
});
assertEquals(4, events.size());
assertThat(events, hasItems("p3c1","p7c1","p11c1","p13c1"));
// StreamMiningAlgorithm m = new StreamMiningAlgorithm() {
// private static final long serialVersionUID = 3268754545347297698L;
//
// @Override
// public Serializable ingest(BEvent event) {
// int product = 1;
// if (getLatestResponse() != null) {
// product = (int) getLatestResponse();
// }
// product *= Integer.parseInt(event.getEventName());
// setLatestResponse(-product);
// return product;
// }
// };
// private OneInputStreamOperatorTestHarness<BEvent, Serializable> testHarness = new OneInputStreamOperatorTestHarness<BEvent, Serializable>(m);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.fromElements(
......@@ -73,10 +118,10 @@ public class AlgorithmTest {
// Collector<BEvent> stream = mock
System.out.println(m.getProcessedEvents());
// System.out.println(m.getProcessedEvents());
assertEquals(4l, m.getProcessedEvents());
assertEquals(3003, m.getLatestResponse());
// assertEquals(4l, m.getProcessedEvents());
// assertEquals(3003, m.getLatestResponse());
}
// @Test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment