Skip to content
Snippets Groups Projects
Commit 15a901d8 authored by Jesper Fyhr Knudsen's avatar Jesper Fyhr Knudsen
Browse files

Allowing Job's to be enqueued on the Orchestrator

parent bcf877da
Branches demo1
No related tags found
No related merge requests found
Showing
with 141 additions and 14 deletions
...@@ -16,9 +16,11 @@ public class StateGetEndpoint implements Endpoint { ...@@ -16,9 +16,11 @@ public class StateGetEndpoint implements Endpoint {
state.runningSize = executionHandler.getQueueSize() > executionHandler.getParallelism() state.runningSize = executionHandler.getQueueSize() > executionHandler.getParallelism()
? executionHandler.getParallelism() ? executionHandler.getParallelism()
: executionHandler.getQueueSize(); : executionHandler.getQueueSize();
state.hasAvailableSlots = state.runningSize < executionHandler.getParallelism();
state.queueSize = executionHandler.getQueueSize(); state.queueSize = executionHandler.getQueueSize();
state.parallelism = executionHandler.getParallelism(); state.parallelism = executionHandler.getParallelism();
state.ramUsage = (int) ((int) Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()); state.ramUsage = (int) (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
Response<AgentStateResult> response = new Response<>(); Response<AgentStateResult> response = new Response<>();
response.Results.add(new AgentStateResult(state)); response.Results.add(new AgentStateResult(state));
......
package com.chaos.octopus.commons.core;
public interface JobEnqueuer{
void enqueue(Job job);
}
...@@ -6,10 +6,9 @@ package com.chaos.octopus.commons.core; ...@@ -6,10 +6,9 @@ package com.chaos.octopus.commons.core;
import java.net.ConnectException; import java.net.ConnectException;
public interface Orchestrator extends AutoCloseable { public interface Orchestrator extends AutoCloseable, JobEnqueuer {
void open() throws ConnectException; void open() throws ConnectException;
void taskCompleted(Task task); void taskCompleted(Task task);
void taskUpdate(Task task); void taskUpdate(Task task);
int get_localListenPort(); int get_localListenPort();
void enqueue(Job job);
} }
...@@ -4,18 +4,27 @@ import java.util.ArrayList; ...@@ -4,18 +4,27 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
public class ClusterState { public class ClusterState {
public int jobsInQueue = 0; public String hostname;
public int port;
//public int cpuUsage;
public int ramUsage;
public int parallelism;
public boolean hasAvailableSlots;
public int queueSize;
public int runningSize;
public List<AgentState> agents = new ArrayList<>(); public List<AgentState> agents = new ArrayList<>();
public static class AgentState{ public static class AgentState{
public String state = ""; public String state = "Connected";
public boolean hasAvailableSlots;
public String hostname; public String hostname;
public int port; public int port;
public int cpuUsage;
//public int cpuUsage;
public int ramUsage; public int ramUsage;
public int parallelism;
public boolean hasAvailableSlots;
public int queueSize; public int queueSize;
public int runningSize; public int runningSize;
public int parallelism;
} }
} }
...@@ -68,6 +68,9 @@ public class AgentProxy { ...@@ -68,6 +68,9 @@ public class AgentProxy {
new TypeToken<Response<AgentStateResult>>() { new TypeToken<Response<AgentStateResult>>() {
}.getType()).Results.get(0); }.getType()).Results.get(0);
result.agentState.hostname = hostname;
result.agentState.port = port;
return result.agentState; return result.agentState;
} }
......
...@@ -12,7 +12,7 @@ import com.chaos.sdk.v6.dto.ClusterState; ...@@ -12,7 +12,7 @@ import com.chaos.sdk.v6.dto.ClusterState;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
// todo Extract enqueue logic from sending a task to an Agent by putting the execution logic on a separate Thread
public class AllocationHandler implements AutoCloseable { public class AllocationHandler implements AutoCloseable {
private ArrayList<AgentProxy> _agents = new ArrayList<>(); private ArrayList<AgentProxy> _agents = new ArrayList<>();
private ArrayList<Job> _Jobs = new ArrayList<>(); private ArrayList<Job> _Jobs = new ArrayList<>();
...@@ -32,12 +32,13 @@ public class AllocationHandler implements AutoCloseable { ...@@ -32,12 +32,13 @@ public class AllocationHandler implements AutoCloseable {
} }
public void enqueue(Job job) { public void enqueue(Job job) {
job.resume();
synchronized (_Jobs) { synchronized (_Jobs) {
for (Job j : _Jobs) for (Job j : _Jobs)
if (j.id.equals(job.id)) return; if (j.id.equals(job.id)) return;
job.print(); //job.print();
job.resume();
_Jobs.add(job); _Jobs.add(job);
......
...@@ -12,6 +12,8 @@ import com.chaos.sdk.AuthenticatedChaosClient; ...@@ -12,6 +12,8 @@ import com.chaos.sdk.AuthenticatedChaosClient;
import com.chaos.sdk.Chaos; import com.chaos.sdk.Chaos;
import java.io.IOException; import java.io.IOException;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -41,7 +43,12 @@ public class OrchestratorImpl implements Orchestrator { ...@@ -41,7 +43,12 @@ public class OrchestratorImpl implements Orchestrator {
_simpleServer.addEndpoint("Task/Update", new TaskUpdateEndpoint(this)); _simpleServer.addEndpoint("Task/Update", new TaskUpdateEndpoint(this));
_simpleServer.addEndpoint("Task/Complete", new TaskCompleteEndpoint(this)); _simpleServer.addEndpoint("Task/Complete", new TaskCompleteEndpoint(this));
_simpleServer.addEndpoint("Agent/Connect", new AgentConnectEndpoint(_AllocationHandler)); _simpleServer.addEndpoint("Agent/Connect", new AgentConnectEndpoint(_AllocationHandler));
_simpleServer.addEndpoint("Heartbeat", new HeartbeatEndpoint(_AllocationHandler)); try {
_simpleServer.addEndpoint("Heartbeat", new HeartbeatEndpoint(_AllocationHandler, listeningPort, Inet4Address.getLocalHost().getHostAddress()));
} catch (UnknownHostException e) {
_simpleServer.addEndpoint("Heartbeat", new HeartbeatEndpoint(_AllocationHandler, listeningPort, "Unknown Hostname"));
}
_simpleServer.addEndpoint("Job/Enqueue", new JobEnqueueEndpoint(this));
} }
public static OrchestratorImpl create(OctopusConfiguration config) throws IOException { public static OrchestratorImpl create(OctopusConfiguration config) throws IOException {
......
package com.chaos.octopus.server; package com.chaos.octopus.server.endpoint;
import com.chaos.octopus.commons.core.Endpoint; import com.chaos.octopus.commons.core.Endpoint;
import com.chaos.octopus.commons.core.Request; import com.chaos.octopus.commons.core.Request;
import com.chaos.octopus.commons.core.Response; import com.chaos.octopus.commons.core.Response;
import com.chaos.octopus.server.AllocationHandler;
import com.chaos.sdk.v6.dto.ClusterState; import com.chaos.sdk.v6.dto.ClusterState;
public class HeartbeatEndpoint implements Endpoint { public class HeartbeatEndpoint implements Endpoint {
private AllocationHandler allocationHandler; private AllocationHandler allocationHandler;
private final int port;
private final String hostname;
public HeartbeatEndpoint(AllocationHandler allocationHandler) { public HeartbeatEndpoint(AllocationHandler allocationHandler, int port, String hostname) {
this.allocationHandler = allocationHandler; this.allocationHandler = allocationHandler;
this.port = port;
this.hostname = hostname;
} }
public Response invoke(Request request) { public Response invoke(Request request) {
ClusterState cs = new ClusterState(); ClusterState cs = new ClusterState();
cs.jobsInQueue = allocationHandler.getQueued(); cs.port = port;
cs.hostname = hostname;
cs.queueSize = allocationHandler.getQueued();
cs.ramUsage = (int) (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
cs.agents = allocationHandler.getAgentStates(); cs.agents = allocationHandler.getAgentStates();
for (ClusterState.AgentState as: cs.agents) {
cs.runningSize += as.runningSize;
cs.parallelism += as.parallelism;
}
cs.hasAvailableSlots = cs.runningSize < cs.parallelism;
return new Response<>(cs); return new Response<>(cs);
} }
} }
package com.chaos.octopus.server.endpoint;
import com.chaos.octopus.commons.core.*;
import com.google.gson.Gson;
import java.util.UUID;
public class JobEnqueueEndpoint implements Endpoint {
private JobEnqueuer jobEnqueuer;
public JobEnqueueEndpoint(JobEnqueuer jobEnqueuer) {
this.jobEnqueuer = jobEnqueuer;
}
public Response invoke(Request request) {
Job job = new Gson().fromJson(request.queryString.get("job"), Job.class);
if(job.id == null || "".equals(job.id))
job.id = UUID.randomUUID().toString();
jobEnqueuer.enqueue(job);
return new Response();
}
}
...@@ -16,7 +16,7 @@ public class Heartbeat implements SynchronizationTask { ...@@ -16,7 +16,7 @@ public class Heartbeat implements SynchronizationTask {
public void action() { public void action() {
ClusterState state = new ClusterState(); ClusterState state = new ClusterState();
state.jobsInQueue = allocationHandler.getQueued(); state.queueSize = allocationHandler.getQueued();
for (AgentProxy ap : allocationHandler.getAgents()){ for (AgentProxy ap : allocationHandler.getAgents()){
ClusterState.AgentState as = ap.getState(); ClusterState.AgentState as = ap.getState();
......
...@@ -28,7 +28,7 @@ public class HeartbeatTest { ...@@ -28,7 +28,7 @@ public class HeartbeatTest {
AuthenticatedChaosClient authenticate = chaos.authenticate("90f4183870e5d60bbb1b595c10f0c48a4edb17a1befeaee3e4146a9d492f0c84"); AuthenticatedChaosClient authenticate = chaos.authenticate("90f4183870e5d60bbb1b595c10f0c48a4edb17a1befeaee3e4146a9d492f0c84");
ClusterState state = new ClusterState(); ClusterState state = new ClusterState();
state.jobsInQueue = 1; state.queueSize = 1;
state.agents = new ArrayList<>(); state.agents = new ArrayList<>();
ClusterState.AgentState agentState = new ClusterState.AgentState(); ClusterState.AgentState agentState = new ClusterState.AgentState();
agentState.state = "Disconnected"; agentState.state = "Disconnected";
......
package com.chaos.octopus.server.unit.endpoint;
import com.chaos.octopus.commons.core.*;
import com.chaos.octopus.server.endpoint.JobEnqueueEndpoint;
import com.google.gson.Gson;
import org.junit.Assert;
import org.junit.Test;
public class JobEnqueueEndpointTest {
@Test
public void invoke_GivenJobWithoutId_CreateIdAndEnqueue(){
SpyJobEnqueuer jobEnqueuer = new SpyJobEnqueuer();
JobEnqueueEndpoint endpoint = new JobEnqueueEndpoint(jobEnqueuer);
Request request = new Request("Job/Enqueue");
Job job = new Job();
Step step = new Step();
Task task = new Task();
task.pluginId = "com.chaos.octopus.agent.unit.TestPlugin, 1.0.0";
task.properties.put("sleep", "10");
step.tasks.add(task);
job.steps.add(step);
request.queryString.put("job", new Gson().toJson(job));
endpoint.invoke(request);
Assert.assertNotNull(jobEnqueuer.job);
Assert.assertNotNull(jobEnqueuer.job.id);
Assert.assertNotEquals("", jobEnqueuer.job.id);
}
class SpyJobEnqueuer implements JobEnqueuer{
public Job job;
public void enqueue(Job job) {
this.job = job;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment