Skip to content
Snippets Groups Projects
Commit 6226e1fe authored by Simon Spinner's avatar Simon Spinner
Browse files

Add delegation handler.

parent 354d3a97
No related branches found
No related tags found
No related merge requests found
......@@ -175,6 +175,10 @@ public class AgentController {
public void handleNotification(ModelNotification notification) {
}
public String[] getDelegationDestinations(EObject object) {
return new String[0];
}
protected void checkRunning() {
if (!running) {
throw new IllegalStateException("The agent is not running.");
......
......@@ -43,7 +43,8 @@ public abstract class AgentScope extends Scope {
private final SystemGlobalScope systemScope;
private final Map<String, AgentScope> scopeCache = new HashMap<>();
private final List<NotificationDefinition> notifications = new ArrayList<>();
private final Map<String, DelegationListener> delegations = new HashMap<>();
protected class NotificationDefinition {
private final Class<?> key;
private final String exchange;
......@@ -74,19 +75,28 @@ public abstract class AgentScope extends Scope {
}
protected class DelegationListener implements Listener {
private AgentController controller;
public void setDelegationHandler(AgentController controller) {
this.controller = controller;
}
@Override
public void newMessage(String routingKey, Map<String, Object> headers, String message) {
ModelNotification notification = new ModelNotification(getModelRepository(), headers);
String[] destinations = notification.getDestinationScopes();
for (String curScopeName : destinations) {
AgentScope curScope = scopeCache.get(curScopeName);
if (curScope == null) {
curScope = (AgentScope) getModelRepository().getScope(curScopeName);
curScope.create();
scopeCache.put(curScopeName, curScope);
if (controller != null) {
ModelNotification notification = new ModelNotification(getModelRepository(), headers);
EObject instance = notification.getInstance();
String[] destinations = controller.getDelegationDestinations(instance);
for (String curScopeName : destinations) {
AgentScope curScope = scopeCache.get(curScopeName);
if (curScope == null) {
curScope = (AgentScope) getModelRepository().getScope(curScopeName);
curScope.create();
scopeCache.put(curScopeName, curScope);
}
curScope.registerElement(notification.getInstance());
curScope.sendNotifications(notification.getNotificationType(), instance);
}
curScope.registerElement(notification.getInstance());
curScope.sendNotifications(notification.getNotificationType(), notification.getInstance());
}
}
}
......@@ -306,6 +316,13 @@ public abstract class AgentScope extends Scope {
});
}
public void addDelegationHandler(String exchange, AgentController agent) {
DelegationListener cur = delegations.get(exchange);
if (cur != null) {
cur.setDelegationHandler(agent);
}
}
protected void declareNotification(NotificationDefinition notification) {
this.notifications.add(notification);
notification.create();
......@@ -316,7 +333,9 @@ public abstract class AgentScope extends Scope {
String delegationQueueName = exchange + ".Delegation";
getMessageBus().createQueue(delegationQueueName);
getMessageBus().bindQueue(delegationQueueName, exchange, "#");
getMessageBus().addMessageListener(delegationQueueName, new DelegationListener());
DelegationListener listener = new DelegationListener();
getMessageBus().addMessageListener(delegationQueueName, listener);
delegations.put(exchange, listener);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment