Skip to content
Snippets Groups Projects
Commit 7f9fd11c authored by Jesper Fyhr Knudsen's avatar Jesper Fyhr Knudsen
Browse files

Merge branch 'master' of bitbucket.org:chaosaps/octopus

Conflicts:
	octopus-java/.idea/workspace.xml
parents 5115f06a 354a307c
No related branches found
No related tags found
No related merge requests found
Showing
with 276 additions and 1886 deletions
......@@ -2,3 +2,4 @@
.classpath
.project
build/
octopus-java/.idea/workspace.xml
This diff is collapsed.
......@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2014 Chaos ApS
Copyright 2016 Chaos ApS
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
......@@ -4,30 +4,18 @@
*/
package com.chaos.octopus.agent;
import com.chaos.octopus.agent.action.AgentAction;
import com.chaos.octopus.agent.action.AgentStateAction;
import com.chaos.octopus.agent.action.EnqueueTaskAction;
import com.chaos.octopus.agent.action.ListSupportedPluginsAction;
import com.chaos.octopus.agent.endpoint.*;
import com.chaos.octopus.commons.core.*;
import com.chaos.octopus.commons.core.message.Message;
import com.chaos.octopus.commons.exception.DisconnectError;
import com.chaos.octopus.commons.util.Commands;
import com.chaos.octopus.commons.util.StreamUtilities;
import com.chaos.octopus.commons.http.SimpleServer;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class Agent implements Runnable, AutoCloseable, TaskStatusChangeListener {
private boolean _isRunning;
private Thread _thread;
public class Agent implements AutoCloseable, TaskStatusChangeListener {
private ExecutionHandler _executionHandler;
private Orchestrator _orchestrator;
private ServerSocket _Server;
private Map<String, AgentAction> _agentActions = new HashMap<>();
private PluginFactory _pluginFactory;
private SimpleServer _simpleServer;
public Agent(String orchestratorHostname, int orchestratorPort, int listenPort) {
this(new OrchestratorProxy(orchestratorHostname, orchestratorPort, listenPort), 4);
......@@ -38,17 +26,13 @@ public class Agent implements Runnable, AutoCloseable, TaskStatusChangeListener
}
public Agent(Orchestrator orchestrator, int parallelism) {
_orchestrator = orchestrator;
_isRunning = false;
_thread = new Thread(this);
_thread.setName("Agent");
_executionHandler = new ExecutionHandler(this, parallelism);
_pluginFactory = new PluginFactory();
_agentActions.put(Commands.LIST_SUPPORTED_PLUGINS, new ListSupportedPluginsAction(_pluginFactory, parallelism));
_agentActions.put(Commands.ENQUEUE_TASK, new EnqueueTaskAction(this));
_agentActions.put(Commands.AGENT_STATE, new AgentStateAction(_executionHandler));
_orchestrator = orchestrator;
_simpleServer = new SimpleServer(_orchestrator.get_localListenPort());
_simpleServer.addEndpoint("Task/Enqueue", new TaskEnqueueEndpoint(this));
_simpleServer.addEndpoint("State/Get", new StateGetEndpoint(_executionHandler));
_simpleServer.addEndpoint("Plugin/Get", new PluginGetEndpoint(_executionHandler, _pluginFactory));
}
public static Agent create(OctopusConfiguration config) {
......@@ -58,10 +42,6 @@ public class Agent implements Runnable, AutoCloseable, TaskStatusChangeListener
public void open() throws IOException {
try {
_orchestrator.open();
_Server = new ServerSocket(_orchestrator.get_localListenPort());
_isRunning = true;
_thread.start();
} catch (DisconnectError e) {
System.out.println(Thread.currentThread().getId() + " AGENT DISCONNECTED");
try {
......@@ -70,30 +50,10 @@ public class Agent implements Runnable, AutoCloseable, TaskStatusChangeListener
e.printStackTrace();
}
}
}
public void run() {
while (_isRunning) {
try {
// todo refactor so the implementation doesn't depend on the socket
try (Socket socket = _Server.accept()) {
String message = StreamUtilities.ReadString(socket.getInputStream());
Message msg = StreamUtilities.ReadJson(message, Message.class);
_agentActions.get(msg.getAction()).invoke(message, socket.getOutputStream());
}
} catch (Exception e) {
if (!_Server.isClosed()) e.printStackTrace();
}
}
}
public void close() throws Exception {
_isRunning = false;
if (_Server != null) _Server.close();
_simpleServer.stop();
if (_executionHandler != null) _executionHandler.close();
}
......@@ -115,7 +75,6 @@ public class Agent implements Runnable, AutoCloseable, TaskStatusChangeListener
public void onTaskUpdate(Task task) {
_orchestrator.taskUpdate(task);
}
public int getQueueSize() {
return _executionHandler.getQueueSize();
}
......
......@@ -32,6 +32,7 @@ public class ExecutionHandler implements AutoCloseable, TaskCompleteListener {
public void enqueue(Plugin plugin) {
_currentQueueSize.incrementAndGet();
ExecutionSlot slot = new ExecutionSlot(plugin);
slot.addTaskCompleteListener(this);
slot.addTaskUpdateListener(_taskStatusChangedListener);
......
......@@ -5,24 +5,24 @@
package com.chaos.octopus.agent;
import com.chaos.octopus.commons.core.*;
import com.chaos.octopus.commons.core.message.ConnectMessage;
import com.chaos.octopus.commons.core.message.TaskMessage;
import com.chaos.octopus.commons.util.Commands;
import com.chaos.octopus.commons.util.NetworkingUtil;
import com.google.gson.Gson;
import java.net.ConnectException;
import java.net.Inet4Address;
import java.net.Socket;
import java.net.UnknownHostException;
public class OrchestratorProxy implements Orchestrator {
private final int port;
private final String hostname;
private int _localListenPort;
private String _localHostAddress;
private NetworkingUtil _network;
public OrchestratorProxy(String hostname, int port, int listenPort) {
_localHostAddress = getHostAddress();
_localListenPort = listenPort;
_network = new NetworkingUtil(hostname, port);
this.port = port;
this.hostname = hostname;
}
private String getHostAddress() {
......@@ -38,22 +38,55 @@ public class OrchestratorProxy implements Orchestrator {
}
public void open() throws ConnectException {
ConnectMessage msg = new ConnectMessage(_localHostAddress, _localListenPort);
_network.send(msg.toJson());
sendResponse("Agent/Connect",
new KeyValue("hostname", _localHostAddress),
new KeyValue("port", _localListenPort + ""));
}
public void taskCompleted(Task task) {
TaskMessage msg = new TaskMessage(Commands.TASK_DONE, task);
String taskString = new Gson().toJson(task);
_network.send(msg.toJson());
sendResponse("Task/Complete", new KeyValue("task", taskString));
}
@Override
public void taskUpdate(Task task) {
TaskMessage msg = new TaskMessage(Commands.TASK_UPDATE, task);
String taskString = new Gson().toJson(task);
sendResponse("Task/Update", new KeyValue("task", taskString));
}
private void sendResponse(String endpoint, KeyValue... parameters) {
String queryString = "";
for (KeyValue entry: parameters)
queryString += String.format("%1s=%2s&", entry.key, entry.value);
sendResponse(String.format("GET /%1s/?%2s HTTP/1.1", endpoint, queryString), 10);
}
_network.send(msg.toJson());
private void sendResponse(String message, int retries) {
try(Socket socket = new Socket(hostname, port)) {
socket.getOutputStream().write(message.getBytes());
} catch (ConnectException e) {
throw new com.chaos.octopus.commons.exception.ConnectException("Connection to Orchestrator could not be established, check hostname and port", e);
} catch (Exception e) {
if (retries > 0) {
sleep(250);
sendResponse(message, --retries);
}
// TODO This exception should be handled at a higher level
System.err.println("Couldn't connect to: " + "" + ":" + "");
e.printStackTrace();
}
}
private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e1) {
}
}
@Override
......
package com.chaos.octopus.agent.action;
import java.io.IOException;
import java.io.OutputStream;
public interface AgentAction {
void invoke(String message, OutputStream out) throws IOException;
}
package com.chaos.octopus.agent.action;
import com.chaos.octopus.agent.Agent;
import com.chaos.octopus.commons.core.message.Message;
import com.chaos.octopus.commons.core.message.TaskMessage;
import com.chaos.octopus.commons.util.NetworkingUtil;
import com.chaos.octopus.commons.util.StreamUtilities;
import java.io.IOException;
import java.io.OutputStream;
public class EnqueueTaskAction implements AgentAction {
private final Agent agent;
public EnqueueTaskAction(Agent agent) {
this.agent = agent;
}
public void invoke(String message, OutputStream out) throws IOException {
TaskMessage enqueueTask = StreamUtilities.ReadJson(message, TaskMessage.class);
agent.enqueue(enqueueTask.getTask());
NetworkingUtil.send(Message.createWithAction("OK").toJson(), out);
}
}
package com.chaos.octopus.agent.action;
package com.chaos.octopus.agent.endpoint;
import com.chaos.octopus.agent.Agent;
import com.chaos.octopus.agent.ExecutionHandler;
import com.chaos.octopus.agent.PluginFactory;
import com.chaos.octopus.commons.core.AgentConfigurationMessage;
import com.chaos.octopus.commons.core.PluginDefinition;
import com.chaos.octopus.commons.util.NetworkingUtil;
import com.chaos.octopus.commons.core.*;
import java.io.IOException;
import java.io.OutputStream;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
public class ListSupportedPluginsAction implements AgentAction {
private final PluginFactory pluginFactory;
private final int parallelism;
/**
* Created by Jesper on 30-06-2016.
*/
public class PluginGetEndpoint implements Endpoint {
public ListSupportedPluginsAction(PluginFactory pluginFactory, int parallelism){
private ExecutionHandler executionHandler;
private PluginFactory pluginFactory;
public PluginGetEndpoint(ExecutionHandler executionHandler, PluginFactory pluginFactory) {
this.executionHandler = executionHandler;
this.pluginFactory = pluginFactory;
this.parallelism = parallelism;
}
public void invoke(String message, OutputStream out) throws IOException {
public Response invoke(Request request) {
AgentConfigurationMessage response = createAgentConfigurationMessage();
NetworkingUtil.send(response.toJson(), out);
Response res = new Response();
AgentConnectResult result = new AgentConnectResult();
res.Results.add(result);
for (String s : response.getSupportedPlugins())
result.supportedPlugins.add(s);
result.masNumberOfSimultaneousTasks = executionHandler.getParallelism();
return res;
}
private AgentConfigurationMessage createAgentConfigurationMessage() {
AgentConfigurationMessage message = new AgentConfigurationMessage();
message.setNumberOfSimulataniousTasks(parallelism);
message.setNumberOfSimulataniousTasks(executionHandler.getParallelism());
for (PluginDefinition definition : get_SupportedPlugins())
message.getSupportedPlugins().add(definition.getId());
......@@ -43,4 +56,6 @@ public class ListSupportedPluginsAction implements AgentAction {
return list;
}
}
package com.chaos.octopus.agent.action;
package com.chaos.octopus.agent.endpoint;
import com.chaos.octopus.agent.ExecutionHandler;
import com.chaos.octopus.commons.core.message.AgentStateMessage;
import com.chaos.octopus.commons.util.NetworkingUtil;
import com.chaos.octopus.commons.core.AgentStateResult;
import com.chaos.octopus.commons.core.Endpoint;
import com.chaos.octopus.commons.core.Request;
import com.chaos.octopus.commons.core.Response;
import com.chaos.sdk.v6.dto.ClusterState;
import java.io.IOException;
import java.io.OutputStream;
public class StateGetEndpoint implements Endpoint {
private ExecutionHandler executionHandler;
public class AgentStateAction implements AgentAction {
private final ExecutionHandler executionHandler;
public AgentStateAction(ExecutionHandler executionHandler) {
public StateGetEndpoint(ExecutionHandler executionHandler) {
this.executionHandler = executionHandler;
}
public void invoke(String message, OutputStream out) throws IOException {
ClusterState.AgentState state = getState();
NetworkingUtil.send(new AgentStateMessage(state), out);
}
public ClusterState.AgentState getState(){
public Response invoke(Request request) {
ClusterState.AgentState state = new ClusterState.AgentState();
state.runningSize = executionHandler.getQueueSize() > executionHandler.getParallelism()
? executionHandler.getParallelism()
......@@ -29,6 +22,9 @@ public class AgentStateAction implements AgentAction {
state.queueSize = executionHandler.getQueueSize();
state.parallelism = executionHandler.getParallelism();
return state;
Response<AgentStateResult> response = new Response<>();
response.Results.add(new AgentStateResult(state));
return response;
}
}
package com.chaos.octopus.agent.endpoint;
import com.chaos.octopus.agent.Agent;
import com.chaos.octopus.commons.core.Endpoint;
import com.chaos.octopus.commons.core.Request;
import com.chaos.octopus.commons.core.Response;
import com.chaos.octopus.commons.core.Task;
import com.chaos.octopus.commons.util.StreamUtilities;
public class TaskEnqueueEndpoint implements Endpoint {
private Agent agent;
public TaskEnqueueEndpoint(Agent agent) {
this.agent = agent;
}
public Response invoke(Request request) {
String taskJson = request.queryString.get("task");
Task task = StreamUtilities.ReadJson(taskJson, Task.class);
agent.enqueue(task);
// todo add proper OK response
return new Response();
}
}
package com.chaos.octopus.agent.unit;
import static org.junit.Assert.*;
import java.util.List;
import com.chaos.octopus.agent.ExecutionHandler;
import com.chaos.octopus.agent.PluginFactory;
import com.chaos.octopus.agent.TaskStatusChangeListener;
import com.chaos.octopus.agent.action.AgentStateAction;
import com.chaos.octopus.agent.action.EnqueueTaskAction;
import com.chaos.octopus.agent.action.ListSupportedPluginsAction;
import com.chaos.octopus.commons.core.*;
import com.chaos.sdk.v6.dto.ClusterState;
import org.junit.Test;
import com.chaos.octopus.agent.Agent;
public class AgentTest {
......
......@@ -2,11 +2,8 @@ package com.chaos.octopus.agent.unit.action;
import com.chaos.octopus.agent.ExecutionHandler;
import com.chaos.octopus.agent.TaskStatusChangeListener;
import com.chaos.octopus.agent.action.AgentStateAction;
import com.chaos.octopus.commons.core.Plugin;
import com.chaos.octopus.commons.core.Task;
import com.chaos.octopus.commons.core.TaskState;
import com.chaos.octopus.commons.core.TestPlugin;
import com.chaos.octopus.agent.endpoint.StateGetEndpoint;
import com.chaos.octopus.commons.core.*;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
......@@ -23,10 +20,11 @@ public class AgentStateActionTest{
}
}, 1);
AgentStateAction esa = new AgentStateAction(eh);
assertEquals(0, esa.getState().runningSize);
assertEquals(0, esa.getState().queueSize);
Response<AgentStateResult> response = new StateGetEndpoint(eh).invoke(new Request("State/Get"));
assertEquals(0, response.Results.get(0).agentState.runningSize);
assertEquals(0, response.Results.get(0).agentState.queueSize);
}
@Test
......@@ -40,15 +38,16 @@ public class AgentStateActionTest{
}
}, 1);
AgentStateAction esa = new AgentStateAction(eh);
Plugin p = new TestPlugin().create(Make_TestTaskThatTake10msToExecute());
eh.enqueue(p);
for (int i = 1000; i > 0 && p.getTask().get_State() != TaskState.Executing; i--)
Thread.sleep(1);
assertEquals(1, esa.getState().runningSize);
assertEquals(1, esa.getState().queueSize);
Response<AgentStateResult> response = new StateGetEndpoint(eh).invoke(new Request("State/Get"));
assertEquals("queueSize", 1, response.Results.get(0).agentState.queueSize);
assertEquals("runningSize", 1, response.Results.get(0).agentState.runningSize);
}
@Test
......@@ -62,7 +61,6 @@ public class AgentStateActionTest{
}
}, 1);
AgentStateAction esa = new AgentStateAction(eh);
Plugin p = new TestPlugin().create(Make_TestTaskThatTake10msToExecute());
eh.enqueue(p);
eh.enqueue(new TestPlugin().create(Make_TestTaskThatTake10msToExecute()));
......@@ -70,8 +68,10 @@ public class AgentStateActionTest{
for (int i = 1000; i > 0 && p.getTask().get_State() != TaskState.Executing; i--)
Thread.sleep(1);
assertEquals(1, esa.getState().runningSize);
assertEquals(2, esa.getState().queueSize);
Response<AgentStateResult> response = new StateGetEndpoint(eh).invoke(new Request("State/Get"));
assertEquals(1, response.Results.get(0).agentState.runningSize);
assertEquals(2, response.Results.get(0).agentState.queueSize);
}
protected Task Make_TestTaskThatTake10msToExecute() {
......
......@@ -40,11 +40,7 @@ public class Program {
}
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
......@@ -60,8 +56,14 @@ public class Program {
}
private static void instanciateOrcestrator(int port) {
try (OrchestratorImpl leader = new OrchestratorImpl(56541)) {
try (OrchestratorImpl leader = new OrchestratorImpl(port)) {
leader.open();
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
......
package com.chaos.octopus.commons.core;
import java.util.*;
public class AgentConnectResult {
public List<String> supportedPlugins = new ArrayList<>();
public int masNumberOfSimultaneousTasks = 0;
}
package com.chaos.octopus.commons.core;
import com.chaos.sdk.v6.dto.ClusterState;
public class AgentStateResult {
public ClusterState.AgentState agentState;
public AgentStateResult(ClusterState.AgentState agentState) {
this.agentState = agentState;
}
}
package com.chaos.octopus.commons.core;
/**
* Created by Jesper on 28-06-2016.
*/
public interface Endpoint {
Response invoke(Request request);
}
package com.chaos.octopus.commons.core;
public class KeyValue {
public String key;
public String value;
public KeyValue(String key, String value) {
this.key = key;
this.value = value;
}
}
package com.chaos.octopus.commons.core;
import java.util.HashMap;
import java.util.Map;
/**
* Created by Jesper on 27-06-2016.
*/
public class Request {
public String endpoint;
public Map<String, String> queryString = new HashMap<>();
public Request(String endpoint){
this.endpoint = endpoint;
}
}
package com.chaos.octopus.commons.core;
import com.chaos.sdk.v6.dto.ClusterState;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.List;
public class Response<T> {
private static Gson _json = new Gson();
public List<T> Results = new ArrayList<>();
public String toJson(){
return _json.toJson(this, Response.class);
}
public class Result {
public List<String> Keys = new ArrayList<>();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment