Skip to content
Snippets Groups Projects
Commit 6480e3b9 authored by Jesper Fyhr Knudsen's avatar Jesper Fyhr Knudsen
Browse files

Heartbeat with basic Agent information is sent each synchronization cycle

parent dccd7c15
Branches
No related tags found
No related merge requests found
Showing
with 389 additions and 162 deletions
......@@ -2,7 +2,11 @@
<project version="4">
<component name="ProjectCodeStyleSettingsManager">
<option name="PER_PROJECT_SETTINGS">
<value />
<value>
<XML>
<option name="XML_LEGACY_SETTINGS_IMPORTED" value="true" />
</XML>
</value>
</option>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default (1)" />
</component>
......
This diff is collapsed.
......@@ -36,34 +36,6 @@ public class NetworkingUtil {
if (!handleResponse) return null;
return StreamUtilities.ReadString(socket.getInputStream());
// BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
//
// out.write(message);
// out.newLine();
//
//
//
// StringBuffer result = new StringBuffer();
//
// BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// String inputLine;
//
// while ((inputLine = in.readLine()) != null)
// result.append(inputLine);
//
// return result.toString();
// OutputStream out = socket.getOutputStream();
// PrintStream ps = new PrintStream(out);
// ps.println(message);
// ps.flush();
//
// if(handleResponse)
// {
// InputStream in = socket.getInputStream();
// return StreamUtilities.ReadString(in);
// }
}
} catch (ConnectException e) {
throw new com.chaos.octopus.commons.exception.ConnectException("Connection to Orchestrator could not be established, check hostname and port", e);
......@@ -96,4 +68,12 @@ public class NetworkingUtil {
}
}
public int get_port() {
return _port;
}
public String get_hostname() {
return _hostname;
}
}
......@@ -7,6 +7,7 @@ package com.chaos.sdk;
import com.chaos.octopus.commons.core.Job;
import com.chaos.octopus.commons.exception.InsufficientPermissionsException;
import com.chaos.sdk.model.McmObject;
import com.chaos.sdk.v6.dto.ClusterState;
import com.chaos.sdk.v6.dto.PortalResponse;
import com.google.gson.Gson;
......@@ -74,9 +75,11 @@ public class AuthenticatedChaosClient implements HeartbeatGateway{
}
@Override
public void set() {
public void set(ClusterState heartbeat) {
try {
PortalResponse response = gateway.call("POST", "v6/Heartbeat/Set", "sessionGUID=" + sessionId);
String json = gson.toJson(heartbeat);
System.out.println(json);
PortalResponse response = gateway.call("POST", "v6/Heartbeat/Set", "sessionGUID=" + sessionId + "&state=" + json);
if("Chaos.Portal.Core.Exceptions.InsufficientPermissionsException".equals(response.Error.Fullname))
throw new InsufficientPermissionsException();
......
package com.chaos.sdk;
import com.chaos.sdk.v6.dto.ClusterState;
public interface HeartbeatGateway {
void set();
void set(ClusterState heartbeat);
}
package com.chaos.sdk.v6.dto;
import java.util.ArrayList;
public class ClusterState {
public int jobsInQueue = 0;
public ArrayList<AgentState> agents = new ArrayList<>();
public static class AgentState{
public String state = "";
public boolean hasAvailableSlots;
public String hostname;
public int port;
}
}
......@@ -5,7 +5,7 @@ import com.chaos.octopus.commons.exception.InsufficientPermissionsException;
import com.chaos.sdk.Chaos;
import com.chaos.sdk.model.McmObject;
import com.chaos.sdk.AuthenticatedChaosClient;
import junit.framework.Assert;
import com.chaos.sdk.v6.dto.ClusterState;
import org.junit.Test;
import java.io.IOException;
......@@ -22,7 +22,7 @@ public class ChaosTest {
@Test
public void authenticate_GivenValidKey_ReturnAuthenticatedClient() throws IOException {
String key = "somekey";
MockGateway gateway = new MockGateway("v6/SiteAccess/Auth?apiKey=" + key, "{\"Header\": {\"Duration\": 177.4577},\"Body\": {\"Count\": 1,\"TotalCount\": 1,\"Results\": [{\"Guid\": \"d01755b9-e019-4d7c-98d2-ca4583196a4f\",\"UserGuid\": \"33333633-3136-3433-2d33-3633382d3633\",\"DateCreated\": 1382903500,\"DateModified\": 1382903500,\"FullName\": \"Chaos.Mcm.Data.Dto.Object\"}]},\"Error\": {\"Fullname\": null,\"Message\": null,\"InnerException\": null}}");
MockGateway gateway = new MockGateway("v6/AuthKey/Login?token=" + key, "{\"Header\": {\"Duration\": 177.4577},\"Body\": {\"Count\": 1,\"TotalCount\": 1,\"Results\": [{\"Guid\": \"d01755b9-e019-4d7c-98d2-ca4583196a4f\",\"UserGuid\": \"33333633-3136-3433-2d33-3633382d3633\",\"DateCreated\": 1382903500,\"DateModified\": 1382903500,\"FullName\": \"Chaos.Mcm.Data.Dto.Object\"}]},\"Error\": {\"Fullname\": null,\"Message\": null,\"InnerException\": null}}");
Chaos api = new Chaos(gateway);
AuthenticatedChaosClient result = api.authenticate(key);
......@@ -52,7 +52,7 @@ public class ChaosTest {
String metadataXml = "somemetadataXml";
String sessionGuid = "someguid";
String response = "{\"Header\": {\"Duration\": 177.4577},\"Body\": {\"Count\": 1,\"TotalCount\": 1,\"Results\": [{\"Value\": \"1\",\"FullName\": \"Chaos.Mcm.Data.Dto.ScalarResult\"}]},\"Error\": {\"Fullname\": null,\"Message\": null,\"InnerException\": null}}";
MockGateway gateway = new MockGateway("v6/Metadata/set?sessionGUID=" + sessionGuid + "&objectGuid=someobjectGuid&metadataSchemaGuid=somemetadataSchemaGuid&languageCode=somelanguageCode&revisionID=somerevisionID&metadataXml=somemetadataXml", response);
MockGateway gateway = new MockGateway("v6/Metadata/Set?sessionGUID=" + sessionGuid + "&objectGuid=someobjectGuid&metadataSchemaGuid=somemetadataSchemaGuid&languageCode=somelanguageCode&revisionID=somerevisionID&metadataXml=somemetadataXml", response);
Chaos api = new Chaos(gateway);
int result = api.metadataSet(sessionGuid, objectGuid, metadataSchemaGuid, languageCode, revisionID, metadataXml);
......@@ -77,19 +77,19 @@ public class ChaosTest {
public void set_GivenInvalidSession_Throw() throws IOException {
String sessionGuid = "someguid";
String response = "{\"Header\": {\"Duration\": 177.4577},\"Body\": {\"Count\": 0,\"TotalCount\": 0,\"Results\": []},\"Error\": {\"Fullname\": \"Chaos.Portal.Core.Exceptions.InsufficientPermissionsException\",\"Message\": \"\",\"InnerException\": null}}";
MockGateway gateway = new MockGateway("v6/Heartbeat/Set?sessionGUID=" + sessionGuid + "", response);
MockGateway gateway = new MockGateway("v6/Heartbeat/Set?sessionGUID=" + sessionGuid + "&state={\"ConnectedAgents\":0,\"jobsInQueue\":0}", response);
AuthenticatedChaosClient api = new AuthenticatedChaosClient(gateway, sessionGuid);
api.set();
api.set(new ClusterState());
}
@Test
public void set_GivenValidSession_ReturnTrue() throws IOException {
String sessionGuid = "someguid";
String response = "{\"Header\": {\"Duration\": 177.4577},\"Body\": {\"Count\": 1,\"TotalCount\": 1,\"Results\": [{\"WasSucess\":true}]},\"Error\": {\"Fullname\": null,\"Message\": null,\"InnerException\": null}}";
MockGateway gateway = new MockGateway("v6/Heartbeat/Set?sessionGUID=" + sessionGuid + "", response);
MockGateway gateway = new MockGateway("v6/Heartbeat/Set?sessionGUID=" + sessionGuid + "&state={\"ConnectedAgents\":0,\"jobsInQueue\":0}", response);
AuthenticatedChaosClient api = new AuthenticatedChaosClient(gateway, sessionGuid);
api.set();
api.set(new ClusterState());
}
}
......@@ -24,6 +24,7 @@ public class AgentProxy {
private int _MaxNumberOfSimultaneousTasks;
private Map<String, Task> _AllocatedTasks;
private NetworkingUtil _network;
private Boolean _isConnected;
public AgentProxy(String hostname, int port) {
_AllocatedTasks = new HashMap<>();
......@@ -68,7 +69,13 @@ public class AgentProxy {
if (!parsedResponse.getAction().equals("OK")) throw new IOException("Agent didnt queue task");
_AllocatedTasks.put(task.taskId, task);
} catch (Exception e) {
_isConnected = true;
}catch (DisconnectError e){
_isConnected = false;
throw e;
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
......@@ -84,4 +91,12 @@ public class AgentProxy {
return _MaxNumberOfSimultaneousTasks - _AllocatedTasks.size() == 0;
}
}
public boolean get_IsConnected() {
return _isConnected;
}
public NetworkingUtil get_network() {
return _network;
}
}
......@@ -80,14 +80,14 @@ public class AllocationHandler implements AutoCloseable {
}
}
public void taskUpdate(Task task) {
public synchronized void taskUpdate(Task task) {
Job job = getJob(task);
if (!job.isComplete())
job.replaceTask(task);
}
public void taskComplete(Task task) {
public synchronized void taskComplete(Task task) {
taskUpdate(task);
for (AgentProxy agent : getAgents())
......
......@@ -56,7 +56,7 @@ public class OrchestratorImpl implements Orchestrator, Runnable {
AuthenticatedChaosClient client = chaos.authenticate(config.getChaosApiKey());
sync.addSynchronizationTask(new UpdateJob(queue, client));
sync.addSynchronizationTask(new EnqueueJobs(leader, client));
sync.addSynchronizationTask(new Heartbeat(client));
sync.addSynchronizationTask(new Heartbeat(leader._AllocationHandler, client));
return leader;
}
......
package com.chaos.octopus.server.synchronization;
import com.chaos.octopus.server.AgentProxy;
import com.chaos.octopus.server.AllocationHandler;
import com.chaos.sdk.HeartbeatGateway;
import com.chaos.sdk.v6.dto.ClusterState;
public class Heartbeat implements SynchronizationTask {
private final AllocationHandler allocationHandler;
private final HeartbeatGateway gateway;
public Heartbeat(HeartbeatGateway gateway) {
public Heartbeat(AllocationHandler allocationHandler, HeartbeatGateway gateway) {
this.allocationHandler = allocationHandler;
this.gateway = gateway;
}
public void action() {
gateway.set();
ClusterState state = new ClusterState();
state.jobsInQueue = allocationHandler.getQueued();
for (AgentProxy ap : allocationHandler.getAgents()){
ClusterState.AgentState as = new ClusterState.AgentState();
as.state = ap.get_IsConnected() ? "Connected" : "Disconnected";
as.hasAvailableSlots = !ap.isQueueFull();
as.hostname = ap.get_network().get_hostname();
as.port = ap.get_network().get_port();
state.agents.add(as);
}
gateway.set(state);
}
}
package com.chaos.octopus.server.unit.Synchronization;
import com.chaos.octopus.server.AllocationHandler;
import com.chaos.octopus.server.synchronization.Heartbeat;
import com.chaos.sdk.*;
import junit.framework.Assert;
import com.chaos.sdk.v6.dto.ClusterState;
import org.junit.*;
import java.io.IOException;
import java.util.ArrayList;
import static org.junit.Assert.assertTrue;
public class HeartbeatTest {
@Test
public void action() {
ChaosMock chaos = new ChaosMock();
Heartbeat hb = new Heartbeat(chaos);
Heartbeat hb = new Heartbeat(new AllocationHandler(), chaos);
hb.action();
Assert.assertTrue(chaos.wasHeartbeatSet);
assertTrue(chaos.wasHeartbeatSet);
}
@Ignore
@Test
public void sdfsdf() throws IOException {
Chaos chaos = new Chaos("https://dev-api.cosound.dk/v6");
AuthenticatedChaosClient authenticate = chaos.authenticate("90f4183870e5d60bbb1b595c10f0c48a4edb17a1befeaee3e4146a9d492f0c84");
authenticate.set();
ClusterState state = new ClusterState();
state.jobsInQueue = 1;
state.agents = new ArrayList<>();
ClusterState.AgentState agentState = new ClusterState.AgentState();
agentState.state = "Disconnected";
state.agents.add(agentState);
authenticate.set(state);
}
private class ChaosMock implements HeartbeatGateway {
public boolean wasHeartbeatSet = false;
@Override
public void set() {
public void set(ClusterState heartbeat) {
wasHeartbeatSet = true;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment