Skip to content
Snippets Groups Projects
Commit 78455c49 authored by Jesper Fyhr Knudsen's avatar Jesper Fyhr Knudsen
Browse files

Possible fix of the update race condition.

parent c984f823
Branches
No related tags found
No related merge requests found
Showing
with 245 additions and 71 deletions
......@@ -49,6 +49,11 @@ public class Agent implements Runnable, AutoCloseable, TaskUpdatedListener
_executionHandler = new ExecutionHandler(this, parrallelism);
}
public static Agent create(OctopusConfiguration config)
{
return new Agent(config.getOrchestratorIp(), config.getOrchestratorPort(), config.getListeningPort(), config.getNumberOfParallelTasks());
}
public void open() throws IOException
{
try
......
......@@ -39,7 +39,7 @@ public class ExecutionSlot implements Runnable
catch(Exception e)
{
_plugin.getTask().set_State(TaskState.Rollingback);
onTaskUpdated(_plugin.getTask());
//onTaskUpdated(_plugin.getTask());
_plugin.rollback();
_plugin.getTask().set_State(TaskState.Rolledback);
onTaskUpdated(_plugin.getTask());
......
......@@ -18,18 +18,27 @@ public class Job
{
for (Step step : steps)
{
if(!step.validate()) return false;
if(step == null || !step.validate()) return false;
}
return true;
}
public Iterable<Task> getTasks()
public Iterable<Task> getTasks(TaskState... criteria)
{
try
{
for(Step step : steps)
{
if(step.isFailed()) break;
if(!step.isCompleted())
return step.getTasks();
return step.getTasks(criteria);
}
}
catch (Exception e)
{
e.printStackTrace();
}
return new ArrayList<>();
......@@ -39,7 +48,8 @@ public class Job
{
for(Step step : steps)
{
if(!step.isFinished()) return false;
if(step.isFailed()) return true;
if(!step.isCompleted()) return false;
}
return true;
......@@ -71,4 +81,27 @@ public class Job
}
}
}
public void resume()
{
for(Task task : getTasks(new TaskState[]{TaskState.Executing, TaskState.Queued}))
{
task.set_State(TaskState.New);
}
}
public void print()
{
System.out.println("Job enqued: " + id);
for(Step step : steps)
{
System.out.println("\tStep: " + step.tasks.size());
for(Task task : step.tasks)
{
System.out.println("\t\t" + task.get_State() + " " + task.taskId);
}
}
}
}
......@@ -74,4 +74,28 @@ public class OctopusConfiguration
public String getChaosApiKey() {
return chaosApiKey;
}
public void setPort(int port) {
this.port = port;
}
public void setOrchestratorIp(String orchestratorIp) {
this.orchestratorIp = orchestratorIp;
}
public void setOrchestratorPort(int orchestratorPort) {
this.orchestratorPort = orchestratorPort;
}
public void setNumberOfParallelTasks(int numberOfParallelTasks) {
this.numberOfParallelTasks = numberOfParallelTasks;
}
public void setChaosApiUrl(String chaosApiUrl) {
this.chaosApiUrl = chaosApiUrl;
}
public void setChaosApiKey(String chaosApiKey) {
this.chaosApiKey = chaosApiKey;
}
}
package com.chaos.octopus.commons.core;
import com.chaos.octopus.commons.core.Task;
import com.chaos.octopus.commons.core.TaskState;
import java.util.ArrayList;
import java.util.Collection;
public class Step
{
......@@ -21,11 +17,22 @@ public class Step
return true;
}
public boolean isFailed()
{
for(Task task : tasks)
{
if (task.get_State() == TaskState.Rolledback || task.get_State() == TaskState.Rollingback)
return true;
}
return false;
}
public boolean isFinished()
{
for(Task task : tasks)
{
if (task.get_State() != TaskState.Committed && task.get_State() != TaskState.Rolledback)
if (task.get_State() != TaskState.Committed)
return false;
}
......@@ -37,13 +44,14 @@ public class Step
return !tasks.isEmpty();
}
public Iterable<Task> getTasks()
public Iterable<Task> getTasks(TaskState... criteria)
{
ArrayList<Task> list = new ArrayList<>();
for (Task task : tasks)
{
if(TaskState.New.equals(task.get_State()))
//if(task.isQueueable() || task.get_State() == TaskState.Executing || task.get_State() == TaskState.Queued)
if(task.get_State().isIn(criteria))
list.add(task);
}
......
......@@ -26,4 +26,9 @@ public class Task
{
_State = state;
}
public boolean isQueueable()
{
return get_State() == TaskState.New;
}
}
......@@ -9,5 +9,32 @@ public enum TaskState
Committing,
Executed,
Rolledback,
Committed
Committed;
public static TaskState[] isQueueable()
{
return new TaskState[]{
TaskState.New
};
}
public static TaskState[] isCompleted()
{
return new TaskState[]{
TaskState.Committed,
TaskState.Rolledback
};
}
public boolean isIn(TaskState... criteria)
{
if(criteria.length == 0) return true;
for (TaskState state : criteria)
{
if(state == this) return true;
}
return false;
}
}
......@@ -60,7 +60,7 @@ public class TestPlugin implements Plugin, PluginDefinition
number += num;
}
System.err.println(getTask().taskId + ": " + num + ", executing");
System.out.println(getTask().taskId + ": " + num + ", executing");
}
if(getTask().properties.containsKey("randomstatus"))
......@@ -79,10 +79,10 @@ public class TestPlugin implements Plugin, PluginDefinition
{
int num = Integer.parseInt(getTask().properties.get("number"));
System.err.println(getTask().taskId + ": " + num + ", executed");
System.out.println(getTask().taskId + ": " + num + ", executed");
}
else
System.err.println(getId() + " executed");
System.out.println(getId() + " executed");
WasExecuted = true;
}
......
......@@ -64,10 +64,18 @@ public class AuthenticatedChaosClient
{
for(Job job : jobs)
{
jobSet(job);
}
}
public void jobSet(Job job) throws IOException
{
if(job.validate())
job.status = job.isComplete() ? "complete" : "inprogress";
else
job.status = "Invalid Job";
String data = gson.toJson(job);
PortalResponse response = gateway.call("POST", "v6/Job/Set", "sessionGUID=" + sessionId + "&data=" + data);
}
}
}
......@@ -6,6 +6,7 @@ import com.chaos.octopus.commons.core.Job;
import com.chaos.octopus.commons.core.Step;
import com.chaos.octopus.commons.core.Task;
import com.chaos.octopus.commons.core.TaskState;
import com.google.gson.Gson;
import org.junit.Test;
import java.util.Iterator;
......@@ -41,7 +42,7 @@ public class JobTest extends TestBase
step.tasks.add(task2);
task1.set_State(TaskState.Queued);
Iterator<Task> results = job.getTasks().iterator();
Iterator<Task> results = job.getTasks(TaskState.isQueueable()).iterator();
assertEquals(task2, results.next());
assertFalse(results.hasNext());
......@@ -61,10 +62,10 @@ public class JobTest extends TestBase
step1.tasks.add(task1);
step1.tasks.add(task2);
step2.tasks.add(task3);
task1.set_State(TaskState.Executed);
task2.set_State(TaskState.Executed);
task1.set_State(TaskState.Committed);
task2.set_State(TaskState.Committed);
Iterator<Task> results = job.getTasks().iterator();
Iterator<Task> results = job.getTasks(TaskState.isQueueable()).iterator();
assertEquals(task3, results.next());
assertFalse(results.hasNext());
......@@ -87,9 +88,9 @@ public class JobTest extends TestBase
step2.tasks.add(task3);
step2.tasks.add(task4);
task1.set_State(TaskState.Committed);
task2.set_State(TaskState.Executing);
task2.set_State(TaskState.Rolledback);
Iterator<Task> results = job.getTasks().iterator();
Iterator<Task> results = job.getTasks(TaskState.isQueueable()).iterator();
assertFalse(results.hasNext());
}
......@@ -137,7 +138,7 @@ public class JobTest extends TestBase
}
@Test
public void isComplete_AllStepsAreExecuted_ReturnFalse()
public void isComplete_AllStepsAreExecuted_ReturnTrue()
{
Job job = new Job();
Step step1 = new Step();
......@@ -153,7 +154,7 @@ public class JobTest extends TestBase
boolean result = job.isComplete();
assertFalse(result);
assertTrue(result);
}
@Test
......@@ -295,4 +296,33 @@ public class JobTest extends TestBase
assertSame(task2, actual);
}
@Test
public void validate_GivenBadlyFormattetJson_ReturnFalse()
{
String json = "{\"steps\": [{\"tasks\": [{\"pluginId\": \"com.chaos.octopus.CommandLinePlugin, 1.0.0\",\"properties\": {\"commandline\": \"/mnt/workset/cosound/wp0x-store/001_cosound/900_test/910_common/misc/test0001/3040_plugin/helloworld/development/main.sh\"}}]},\t ]}";
Job job = new Gson().fromJson(json, Job.class);
boolean b = job.validate();
assertFalse(b);
}
@Test
public void isComplete_TreatRollingbackAsRolledback_RetrunTrue()
{
Job job = new Job();
Step step1 = new Step();
Task task1 = make_Task();
Task task2 = make_Task();
job.steps.add(step1);
step1.tasks.add(task1);
task1.taskId = "unique taskId";
task2.taskId = "unique taskId";
task1.set_State(TaskState.Rollingback);
task2.set_State(TaskState.Rolledback);
boolean b = job.isComplete();
assertTrue(b);
}
}
......@@ -38,7 +38,7 @@ public class StepTest extends TestBase
step.tasks.add(task2);
task1.set_State(TaskState.Committed);
Iterator<Task> results = step.getTasks().iterator();
Iterator<Task> results = step.getTasks(TaskState.isQueueable()).iterator();
assertEquals(task2, results.next());
assertFalse(results.hasNext());
......
......@@ -6,12 +6,6 @@ import com.chaos.octopus.agent.plugin.CommandLinePlugin;
import com.chaos.octopus.commons.core.OctopusConfiguration;
import com.chaos.octopus.commons.core.TestPlugin;
import com.chaos.octopus.server.OrchestratorImpl;
import com.chaos.octopus.server.synchronization.EnqueueJobs;
import com.chaos.octopus.server.synchronization.Synchronization;
import com.chaos.octopus.server.synchronization.UpdateJob;
import com.chaos.sdk.AuthenticatedChaosClient;
import com.chaos.sdk.Chaos;
import com.sun.corba.se.impl.orbutil.concurrent.Sync;
import org.apache.commons.daemon.DaemonContext;
import org.apache.commons.daemon.DaemonInitException;
......@@ -29,15 +23,11 @@ public class Daemon implements org.apache.commons.daemon.Daemon
@Override
public void start() throws Exception
{
System.err.println("HeapSize: " + Runtime.getRuntime().totalMemory());
System.err.println("HeapMaxSize: " + Runtime.getRuntime().maxMemory());
System.err.println("HeapFreeSize: " + Runtime.getRuntime().freeMemory());
config = new OctopusConfiguration();
if(config.getIsAgent())
{
agent = new Agent(config.getOrchestratorIp(), config.getOrchestratorPort(), config.getListeningPort(), config.getNumberOfParallelTasks());
agent = Agent.create(config);
agent.addPlugin(new TestPlugin());
agent.addPlugin(new CommandLinePlugin());
agent.addPlugin(new ChaosPlugin());
......
......@@ -6,6 +6,7 @@ import com.chaos.octopus.agent.Agent;
import com.chaos.octopus.agent.plugin.ChaosPlugin;
import com.chaos.octopus.agent.plugin.CommandLinePlugin;
import com.chaos.octopus.commons.core.Job;
import com.chaos.octopus.commons.core.OctopusConfiguration;
import com.chaos.octopus.commons.core.TestPlugin;
import com.chaos.octopus.server.*;
import com.google.gson.*;
......@@ -23,8 +24,18 @@ public class Program
System.out.println("Initializing Octopus...");
try(OrchestratorImpl leader = new OrchestratorImpl(port);
Agent agent = new Agent("localhost", port, port +1))
OctopusConfiguration orcConfig = new OctopusConfiguration();
orcConfig.setChaosApiUrl("http://api.cosound.chaos-systems.com/v6/");
orcConfig.setChaosApiKey("b22058bb0c7b2fe4bd3cbffe99fe456b396cbe2083be6c0fdcc50b706d8b4270");
orcConfig.setPort(2500);
OctopusConfiguration agentConfig = new OctopusConfiguration();
agentConfig.setPort(2501);
agentConfig.setOrchestratorIp("127.0.0.1");
agentConfig.setOrchestratorPort(2500);
try(OrchestratorImpl leader = OrchestratorImpl.create(orcConfig);
Agent agent = Agent.create(agentConfig))
{
agent.addPlugin(new TestPlugin());
agent.addPlugin(new CommandLinePlugin());
......
package com.chaos.octopus.server;
import com.chaos.octopus.commons.core.Job;
import com.chaos.octopus.commons.core.Task;
import com.chaos.octopus.commons.core.TaskState;
import com.chaos.octopus.commons.core.*;
import com.chaos.octopus.commons.exception.ConnectException;
import com.chaos.octopus.commons.exception.DisconnectError;
import java.util.ArrayList;
......@@ -39,6 +36,9 @@ public class AllocationHandler implements AutoCloseable
if(j.id.equals(job.id)) return;
}
job.print();
job.resume();
_Jobs.add(job);
enqueueNextTaskOnAgent();
......@@ -59,8 +59,7 @@ public class AllocationHandler implements AutoCloseable
{
for (Job job : _Jobs)
{
for(Task task : job.getTasks())
for(Task task : job.getTasks(TaskState.isQueueable()))
{
enqueue(task);
}
......
......@@ -35,4 +35,9 @@ public class ConcurrentJobQueue
return result;
}
public boolean contains(String id)
{
return _hashJobs.containsKey(id);
}
}
......@@ -49,8 +49,8 @@ public class OrchestratorImpl implements Orchestrator, Runnable
Chaos chaos = new Chaos(config.getChaosApiUrl());
AuthenticatedChaosClient client = chaos.authenticate(config.getChaosApiKey());
sync.addSynchronizationTask(new EnqueueJobs(leader, client));
sync.addSynchronizationTask(new UpdateJob(queue, client));
sync.addSynchronizationTask(new EnqueueJobs(leader, client));
return leader;
}
......@@ -141,22 +141,34 @@ public class OrchestratorImpl implements Orchestrator, Runnable
@Override
public void taskCompleted(Task task)
{
try
{
Job job = _AllocationHandler.getJob(task);
_jobsWithUpdates.put(job);
_AllocationHandler.taskComplete(task);
_jobsWithUpdates.put(job);
}
catch (ArrayIndexOutOfBoundsException e)
{
// No job found
}
}
@Override
public void taskUpdate(Task task)
{
try
{
Job job = _AllocationHandler.getJob(task);
_jobsWithUpdates.put(job);
_AllocationHandler.taskUpdate(task);
_jobsWithUpdates.put(job);
}
catch (ArrayIndexOutOfBoundsException e)
{
// No job found
}
}
@Override
......@@ -168,6 +180,8 @@ public class OrchestratorImpl implements Orchestrator, Runnable
@Override
public void enqueue(Job job)
{
if(_jobsWithUpdates.contains(job.id)) return;
_AllocationHandler.enqueue(job);
}
......@@ -177,6 +191,7 @@ public class OrchestratorImpl implements Orchestrator, Runnable
if(_socket != null) _socket.close();
if(_AllocationHandler != null) _AllocationHandler.close();
if(_synchronization != null) _synchronization.stop();
}
public List<String> parsePluginList(byte[] data)
......
......@@ -24,7 +24,10 @@ public class EnqueueJobs implements SynchronizationTask
{
for(Job job : client.jobGet())
{
if(job.validate() && !job.isComplete())
orchestrator.enqueue(job);
else
client.jobSet(job);
}
}
catch (IOException e)
......
......@@ -44,4 +44,9 @@ public class Synchronization extends TimerTask
task.action();
}
}
public void stop()
{
if(timer != null) timer.shutdownNow();
}
}
......@@ -25,7 +25,7 @@ public class AllocationHandlerTest
public void enqueue_JobWithOneStep_CallEnqueueWithTask() throws DisconnectError {
AllocationHandler ah = new AllocationHandler();
Job job = make_JobWithOneStep();
Task task = job.getTasks().iterator().next();
Task task = job.getTasks(TaskState.isQueueable()).iterator().next();
AgentProxy agent = mock(AgentProxy.class);
ah.addAgent(agent);
......
......@@ -2,6 +2,8 @@ package com.chaos.octopus.server.unit.Synchronization;
import com.chaos.octopus.commons.core.Job;
import com.chaos.octopus.commons.core.Orchestrator;
import com.chaos.octopus.commons.core.Step;
import com.chaos.octopus.commons.core.Task;
import com.chaos.octopus.server.synchronization.EnqueueJobs;
import com.chaos.sdk.AuthenticatedChaosClient;
import org.junit.Test;
......@@ -19,6 +21,10 @@ public class EnqueueJobsTest
AuthenticatedChaosClient chaos = mock(AuthenticatedChaosClient.class);
EnqueueJobs enqueueJobs = new EnqueueJobs(orchestrator, chaos);
Job job = new Job();
Step step = new Step();
Task task = new Task();
step.tasks.add(task);
job.steps.add(step);
when(chaos.jobGet()).thenReturn(Arrays.asList(job));
enqueueJobs.action();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment