Skip to content
Snippets Groups Projects
Commit e392175f authored by Jesper Fyhr Knudsen's avatar Jesper Fyhr Knudsen
Browse files

Implement syncronous Enqueue calls

with overhead below 500ms, currently around 5-20ms
parent 84972ae9
No related branches found
No related tags found
No related merge requests found
package com.chaos.octopus.commons.core;
public interface JobEnqueuer{
public interface JobQueue {
void enqueue(Job job);
}
......@@ -6,7 +6,7 @@ package com.chaos.octopus.commons.core;
import java.net.ConnectException;
public interface Orchestrator extends AutoCloseable, JobEnqueuer {
public interface Orchestrator extends AutoCloseable, JobQueue {
void open() throws ConnectException;
void taskCompleted(Task task);
void taskUpdate(Task task);
......
......@@ -16,7 +16,7 @@ public class SimpleServer implements Runnable{
private Thread _thread;
private boolean _isRunning = true;
private ServerSocket _serverSocket;
private ExecutorService _pool = Executors.newFixedThreadPool(8);
private ExecutorService _pool = Executors.newFixedThreadPool(64);
private Map<String, Endpoint> _endpoints = new HashMap<>();
public SimpleServer(int listeningPort){
......
......@@ -19,9 +19,7 @@ import java.util.ArrayList;
import java.util.List;
public class OrchestratorImpl implements Orchestrator {
// keeps track of the jobs thqt need to be updated
// listens for packets from the agents
// Parses the message and decides how to handle it
// keeps track of the jobs that need to be updated
// Contains the synchronization
private final ConcurrentJobQueue _jobsWithUpdates;
private int _port;
......@@ -81,8 +79,6 @@ public class OrchestratorImpl implements Orchestrator {
Job job = _AllocationHandler.getJob(task);
_jobsWithUpdates.put(job);
System.out.println(new Gson().toJson(task));
_AllocationHandler.taskComplete(task);
} catch (ArrayIndexOutOfBoundsException e) {
// No job found
......
......@@ -6,10 +6,10 @@ import com.google.gson.Gson;
import java.util.UUID;
public class JobEnqueueEndpoint implements Endpoint {
private JobEnqueuer jobEnqueuer;
private JobQueue jobQueue;
public JobEnqueueEndpoint(JobEnqueuer jobEnqueuer) {
this.jobEnqueuer = jobEnqueuer;
public JobEnqueueEndpoint(JobQueue jobQueue) {
this.jobQueue = jobQueue;
}
public Response invoke(Request request) {
......@@ -18,8 +18,19 @@ public class JobEnqueueEndpoint implements Endpoint {
if(job.id == null || "".equals(job.id))
job.id = UUID.randomUUID().toString();
jobEnqueuer.enqueue(job);
jobQueue.enqueue(job);
return new Response();
boolean shouldWait = request.queryString.containsKey("wait") && "true".equals(request.queryString.get("wait"));
while (shouldWait && !job.isComplete()){
try {
Thread.sleep(1);
} catch (InterruptedException e) { }
}
Response<Job> response = new Response();
response.Results.add(job);
return response;
}
}
......@@ -10,7 +10,7 @@ public class JobEnqueueEndpointTest {
@Test
public void invoke_GivenJobWithoutId_CreateIdAndEnqueue(){
SpyJobEnqueuer jobEnqueuer = new SpyJobEnqueuer();
SpyJobQueue jobEnqueuer = new SpyJobQueue();
JobEnqueueEndpoint endpoint = new JobEnqueueEndpoint(jobEnqueuer);
Request request = new Request("Job/Enqueue");
Job job = new Job();
......@@ -22,17 +22,42 @@ public class JobEnqueueEndpointTest {
job.steps.add(step);
request.queryString.put("job", new Gson().toJson(job));
endpoint.invoke(request);
Response<Job> response = endpoint.invoke(request);
Assert.assertNotNull(jobEnqueuer.job);
Assert.assertNotNull(jobEnqueuer.job.id);
Assert.assertNotEquals("", jobEnqueuer.job.id);
Assert.assertNotNull(response.Results.get(0).id);
}
class SpyJobEnqueuer implements JobEnqueuer{
@Test
public void invoke_GivenWaitIsTrue_InvokeTaskAndWaitForDone(){
SpyJobQueue jobEnqueuer = new SpyJobQueue();
JobEnqueueEndpoint endpoint = new JobEnqueueEndpoint(jobEnqueuer);
Request request = new Request("Job/Enqueue");
request.queryString.put("wait", "true");
Job job = new Job();
Step step = new Step();
Task task = new Task();
task.pluginId = "com.chaos.octopus.agent.unit.TestPlugin, 1.0.0";
task.properties.put("sleep", "10");
step.tasks.add(task);
job.steps.add(step);
request.queryString.put("job", new Gson().toJson(job));
Response<Job> response = endpoint.invoke(request);
Assert.assertNotNull(jobEnqueuer.job);
Assert.assertNotNull(jobEnqueuer.job.id);
Assert.assertNotEquals("", jobEnqueuer.job.id);
Assert.assertTrue(response.Results.get(0).isComplete());
}
class SpyJobQueue implements JobQueue {
public Job job;
public void enqueue(Job job) {
job.steps.get(0).tasks.get(0).set_State(TaskState.Committed);
this.job = job;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment