Skip to content
Snippets Groups Projects
Commit 65e55587 authored by Jesper Fyhr Knudsen's avatar Jesper Fyhr Knudsen
Browse files

Make Heartbeat accessible through the API

parent 7f9fd11c
No related branches found
No related tags found
No related merge requests found
Showing
with 105 additions and 63 deletions
......@@ -78,4 +78,8 @@ public class Agent implements AutoCloseable, TaskStatusChangeListener {
public int getQueueSize() {
return _executionHandler.getQueueSize();
}
public boolean getIsRunning() {
return _simpleServer.getIsRunning();
}
}
......@@ -27,20 +27,19 @@ public class Program {
}
int port = Integer.parseInt(cmd.getOptionValue("p", "44000"));
String orchestratorAddress = cmd.getOptionValue("oa", "localhost");
String orchestratorAddress = cmd.getOptionValue("a", "localhost");
if (cmd.hasOption("op")) {
if (cmd.hasOption("s")) {
System.out.println("Starting Agent");
int orchestratorport = Integer.parseInt(cmd.getOptionValue("op"));
String s = cmd.getOptionValue("s");
int orchestratorport = Integer.parseInt(s);
instanciateAgent(port, orchestratorAddress, orchestratorport);
} else{
System.out.println("Starting Orchestrator");
instanciateOrcestrator(port);
}
}
......@@ -50,6 +49,10 @@ public class Program {
agent.addPlugin(new CommandLinePlugin());
agent.addPlugin(new ChaosPlugin());
agent.open();
while (agent.getIsRunning())
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
......@@ -59,11 +62,9 @@ public class Program {
try (OrchestratorImpl leader = new OrchestratorImpl(port)) {
leader.open();
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (leader.getIsRunning())
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
......@@ -80,9 +81,9 @@ public class Program {
private static Options createOptions() {
Options options = new Options();
options.addOption("p", "port", false, "Specify the listening port [44000]");
options.addOption("oa", "orchestrator-address", false, "Specify the IP address/hostname of the Orchestrator");
options.addOption("op", "orchestrator-port", false, "Specify the port of the Orchestrator");
options.addOption("p", "port", true, "Specify the listening port [44000]");
options.addOption("a", "orchestrator-address", true, "Specify the IP address/hostname of the Orchestrator");
options.addOption("s", "orchestrator-port", true, "Specify the port of the Orchestrator");
options.addOption("d", "debug", false, "Prints debug message to stdout");
options.addOption("h", "help", false, "Print help");
......
package com.chaos.octopus.commons.core;
import com.chaos.sdk.v6.dto.ClusterState;
import com.google.gson.Gson;
import java.util.ArrayList;
......@@ -10,11 +9,21 @@ public class Response<T> {
private static Gson _json = new Gson();
public List<T> Results = new ArrayList<>();
public Response() {}
public Response(T result) {
Results.add(result);
}
public String toJson(){
return _json.toJson(this, Response.class);
}
public class Result {
public List<String> Keys = new ArrayList<>();
public static class Error{
public String message = "Error";
public Error(String message) {
this.message = message;
}
}
}
......@@ -8,9 +8,6 @@ import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;
/**
* Created by Jesper on 28-06-2016.
*/
public class RequestParser {
public static Request parse(String requestString) {
int startOfEndpoint = requestString.indexOf(" ") + 1;
......@@ -20,10 +17,9 @@ public class RequestParser {
String endpoint = requestString.substring(startOfEndpoint, endOfEndpoint)
.replaceAll("/$|^/", "");
Request request = new Request(endpoint);
Request request = new Request(endpoint.trim());
request.queryString = parseQueryString(requestString);
return request;
}
......
......@@ -8,6 +8,7 @@ import java.io.IOException;
import java.net.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -51,6 +52,10 @@ public class SimpleServer implements Runnable{
_endpoints.put(route, endpoint);
}
public boolean getIsRunning() {
return _isRunning;
}
private class RequestHandler implements Runnable {
private Socket socket;
......@@ -78,15 +83,7 @@ public class SimpleServer implements Runnable{
if(_endpoints.containsKey(request.endpoint))
return _endpoints.get(request.endpoint).invoke(request);
Response res = new Response();
Response.Result result = res.new Result();
res.Results.add(result);
for (String val : request.queryString.values()) {
result.Keys.add(val);
}
return res;
return new Response<>(new Response.Error("Endpoint not found: " + request.endpoint));
}
String readRequestFromStream() throws IOException {
......
package com.chaos.sdk.v6.dto;
import java.util.ArrayList;
import java.util.List;
public class ClusterState {
public int jobsInQueue = 0;
public ArrayList<AgentState> agents = new ArrayList<>();
public List<AgentState> agents = new ArrayList<>();
public static class AgentState{
public String state = "";
......
......@@ -5,19 +5,12 @@
package com.chaos.octopus.server;
import com.chaos.octopus.commons.core.*;
import com.chaos.octopus.commons.core.message.AgentStateMessage;
import com.chaos.octopus.commons.core.message.Message;
import com.chaos.octopus.commons.core.message.TaskMessage;
import com.chaos.octopus.commons.exception.ConnectException;
import com.chaos.octopus.commons.exception.DisconnectError;
import com.chaos.octopus.commons.util.Commands;
import com.chaos.octopus.commons.util.NetworkingUtil;
import com.chaos.octopus.commons.util.StreamUtilities;
import com.chaos.sdk.v6.dto.ClusterState;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.Socket;
import java.util.ArrayList;
......@@ -43,7 +36,8 @@ public class AgentProxy {
}
public void InitializeAgent() {
Response<AgentConnectResult> response = sendRequest("Plugin/Get", new TypeToken<Response<AgentConnectResult>>(){}.getType());
Response<AgentConnectResult> response = sendRequest("Plugin/Get", new TypeToken<Response<AgentConnectResult>>() {
}.getType());
InitializeAgent(response);
}
......@@ -69,6 +63,12 @@ public class AgentProxy {
sendRequest("Task/Enqueue", new KeyValue("task", taskString));
}
public ClusterState.AgentState getAgentState() throws DisconnectError {
return (ClusterState.AgentState) sendRequest("State/Get",
new TypeToken<Response<ClusterState.AgentState>>() {
}.getType()).Results.get(0);
}
public void taskCompleted(Task task) {
_AllocatedTasks.remove(task.taskId);
}
......@@ -84,7 +84,8 @@ public class AgentProxy {
String stateString = new Gson().toJson(state);
Response<AgentStateResult> result = sendRequest("State/Get",
new TypeToken<Response<AgentStateResult>>(){}.getType(),
new TypeToken<Response<AgentStateResult>>() {
}.getType(),
new KeyValue("state", stateString));
state = result.Results.get(0).agentState;
......@@ -111,7 +112,8 @@ public class AgentProxy {
queryString += String.format("%1s=%2s&", entry.key, entry.value);
return sendRequest(String.format("GET /%1s/?%2s HTTP/1.1", endpoint, queryString),
new TypeToken<Response>(){}.getType(),
new TypeToken<Response>() {
}.getType(),
10);
}
......@@ -128,7 +130,8 @@ public class AgentProxy {
try (Socket socket = new Socket(hostname, port)) {
socket.getOutputStream().write(message.getBytes());
while (socket.getInputStream().available() == 0){}
while (socket.getInputStream().available() == 0) {
}
String reaponseString = "";
......@@ -143,14 +146,14 @@ public class AgentProxy {
return new Gson().fromJson(content, t);
} catch (java.net.ConnectException e) {
throw new com.chaos.octopus.commons.exception.ConnectException("Connection to Orchestrator could not be established, check hostname and port", e);
throw new com.chaos.octopus.commons.exception.ConnectException(
"Connection to Orchestrator could not be established, check hostname and port", e);
} catch (Exception e) {
if (retries > 0) {
sleep(250);
sendRequest(message, t, --retries);
}
// TODO This exception should be handled at a higher level
System.err.println("Couldn't connect to: " + "" + ":" + "");
e.printStackTrace();
......
......@@ -8,8 +8,10 @@ import com.chaos.octopus.commons.core.Job;
import com.chaos.octopus.commons.core.Task;
import com.chaos.octopus.commons.core.TaskState;
import com.chaos.octopus.commons.exception.ConnectException;
import com.chaos.sdk.v6.dto.ClusterState;
import java.util.ArrayList;
import java.util.List;
public class AllocationHandler implements AutoCloseable {
private ArrayList<AgentProxy> _agents = new ArrayList<>();
......@@ -134,4 +136,20 @@ public class AllocationHandler implements AutoCloseable {
public int getQueued() {
return _Jobs.size();
}
public List<ClusterState.AgentState> getAgentStates() {
List<ClusterState.AgentState> states = new ArrayList<>();
for (int i = 0; i < getAgents().size(); i++){
try{
states.add(getAgents().get(i).getAgentState());
}
catch (Error error) {
_agents.remove(i);
i--;
}
}
return states;
}
}
......@@ -3,10 +3,20 @@ package com.chaos.octopus.server;
import com.chaos.octopus.commons.core.Endpoint;
import com.chaos.octopus.commons.core.Request;
import com.chaos.octopus.commons.core.Response;
import com.chaos.sdk.v6.dto.ClusterState;
public class HeartbeatEndpoint implements Endpoint {
@Override
private AllocationHandler allocationHandler;
public HeartbeatEndpoint(AllocationHandler allocationHandler) {
this.allocationHandler = allocationHandler;
}
public Response invoke(Request request) {
return new Response();
ClusterState cs = new ClusterState();
cs.jobsInQueue = allocationHandler.getQueued();
cs.agents = allocationHandler.getAgentStates();
return new Response<>(cs);
}
}
......@@ -41,7 +41,7 @@ public class OrchestratorImpl implements Orchestrator {
_simpleServer.addEndpoint("Task/Update", new TaskUpdateEndpoint(this));
_simpleServer.addEndpoint("Task/Complete", new TaskCompleteEndpoint(this));
_simpleServer.addEndpoint("Agent/Connect", new AgentConnectEndpoint(_AllocationHandler));
_simpleServer.addEndpoint("Heartbeat", new HeartbeatEndpoint());
_simpleServer.addEndpoint("Heartbeat", new HeartbeatEndpoint(_AllocationHandler));
}
public static OrchestratorImpl create(OctopusConfiguration config) throws IOException {
......@@ -124,4 +124,7 @@ public class OrchestratorImpl implements Orchestrator {
return _synchronization;
}
public boolean getIsRunning(){
return _simpleServer.getIsRunning();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment