Skip to content
Snippets Groups Projects
Commit bf3085e3 authored by Jesper Fyhr Knudsen's avatar Jesper Fyhr Knudsen
Browse files

Add message when an Agent connect to the orchestrator

parent df4e4768
Branches
No related tags found
No related merge requests found
...@@ -12,13 +12,13 @@ import java.net.Inet4Address; ...@@ -12,13 +12,13 @@ import java.net.Inet4Address;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
public class OrchestratorProxy implements Orchestrator { class OrchestratorProxy implements Orchestrator {
private final int port; private final int port;
private final String hostname; private final String hostname;
private int _localListenPort; private int _localListenPort;
private String _localHostAddress; private String _localHostAddress;
public OrchestratorProxy(String hostname, int port, int listenPort) { OrchestratorProxy(String hostname, int port, int listenPort) {
_localHostAddress = getHostAddress(); _localHostAddress = getHostAddress();
_localListenPort = listenPort; _localListenPort = listenPort;
this.port = port; this.port = port;
...@@ -41,6 +41,8 @@ public class OrchestratorProxy implements Orchestrator { ...@@ -41,6 +41,8 @@ public class OrchestratorProxy implements Orchestrator {
sendResponse("Agent/Connect", sendResponse("Agent/Connect",
new KeyValue("hostname", _localHostAddress), new KeyValue("hostname", _localHostAddress),
new KeyValue("port", _localListenPort + "")); new KeyValue("port", _localListenPort + ""));
System.out.println("Connected to orchestrator");
} }
public void taskCompleted(Task task) { public void taskCompleted(Task task) {
......
...@@ -19,7 +19,7 @@ import java.util.List; ...@@ -19,7 +19,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public class AgentProxy { public class AgentProxy {
public final int port; private final int port;
private final String hostname; private final String hostname;
private List<String> _SupportedPlugins; private List<String> _SupportedPlugins;
private int _MaxNumberOfSimultaneousTasks; private int _MaxNumberOfSimultaneousTasks;
...@@ -63,7 +63,7 @@ public class AgentProxy { ...@@ -63,7 +63,7 @@ public class AgentProxy {
sendRequest("Task/Enqueue", new KeyValue("task", taskString)); sendRequest("Task/Enqueue", new KeyValue("task", taskString));
} }
public ClusterState.AgentState getAgentState() throws DisconnectError { ClusterState.AgentState getAgentState() throws DisconnectError {
AgentStateResult result = (AgentStateResult) sendRequest("State/Get", AgentStateResult result = (AgentStateResult) sendRequest("State/Get",
new TypeToken<Response<AgentStateResult>>() { new TypeToken<Response<AgentStateResult>>() {
}.getType()).Results.get(0); }.getType()).Results.get(0);
...@@ -74,11 +74,11 @@ public class AgentProxy { ...@@ -74,11 +74,11 @@ public class AgentProxy {
return result.agentState; return result.agentState;
} }
public void taskCompleted(Task task) { void taskCompleted(Task task) {
_AllocatedTasks.remove(task.taskId); _AllocatedTasks.remove(task.taskId);
} }
public boolean isQueueFull() { boolean isQueueFull() {
return _MaxNumberOfSimultaneousTasks - _AllocatedTasks.size() == 0; return _MaxNumberOfSimultaneousTasks - _AllocatedTasks.size() == 0;
} }
...@@ -106,7 +106,7 @@ public class AgentProxy { ...@@ -106,7 +106,7 @@ public class AgentProxy {
return state; return state;
} }
public String getHostname() { String getHostname() {
return hostname; return hostname;
} }
...@@ -132,10 +132,14 @@ public class AgentProxy { ...@@ -132,10 +132,14 @@ public class AgentProxy {
} }
private <T> Response<T> sendRequest(String message, Type t, int retries) { private <T> Response<T> sendRequest(String message, Type t, int retries) {
try (Socket socket = new Socket(hostname, port)) { try (Socket socket = new Socket(hostname, port)) {
socket.getOutputStream().write(message.getBytes()); socket.getOutputStream().write(message.getBytes());
while (socket.getInputStream().available() == 0) { long timeout = System.currentTimeMillis() + 5000;
while (socket.getInputStream().available() == 0 && timeout > System.currentTimeMillis()) {
Thread.sleep(1);
} }
String reaponseString = ""; String reaponseString = "";
...@@ -155,7 +159,7 @@ public class AgentProxy { ...@@ -155,7 +159,7 @@ public class AgentProxy {
"Connection to Orchestrator could not be established, check hostname and port", e); "Connection to Orchestrator could not be established, check hostname and port", e);
} catch (Exception e) { } catch (Exception e) {
if (retries > 0) { if (retries > 0) {
sleep(250); sleep(100);
sendRequest(message, t, --retries); sendRequest(message, t, --retries);
} }
......
...@@ -24,6 +24,8 @@ public class AgentConnectEndpoint implements Endpoint { ...@@ -24,6 +24,8 @@ public class AgentConnectEndpoint implements Endpoint {
ap.InitializeAgent(); ap.InitializeAgent();
allocationHandler.addAgent(ap); allocationHandler.addAgent(ap);
System.out.println("Agent Connected");
} catch (ConnectException e) { } catch (ConnectException e) {
System.err.println("Connection to Agent could not be established, hostname: " + hostname + ", port: " + port); System.err.println("Connection to Agent could not be established, hostname: " + hostname + ", port: " + port);
e.printStackTrace(); e.printStackTrace();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment