Skip to content
Snippets Groups Projects
Commit c3d9b6f1 authored by Simon Spinner's avatar Simon Spinner
Browse files

Add exchange information to newMessage.

parent 2352cc8a
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,7 @@ import java.util.Map;
public interface MessageBus {
public interface Listener {
void newMessage(String routingKey, Map<String, Object> headers, String message);
void newMessage(String exchange, String routingKey, Map<String, Object> headers, String message);
}
public enum ExchangeType {
......
......@@ -175,7 +175,7 @@ public class AgentController {
public void handleNotification(ModelNotification notification) {
}
public String[] getDelegationDestinations(EObject object) {
public String[] getDelegationDestinations(String exchange, String routingKey, EObject object) {
return new String[0];
}
......
......@@ -150,7 +150,8 @@ public class RabbitMQMessageBus implements MessageBus {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
listener.newMessage(envelope.getRoutingKey(), properties.getHeaders(), new String(body));
listener.newMessage(envelope.getExchange(), envelope.getRoutingKey(), properties.getHeaders(),
new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
......
......@@ -82,11 +82,11 @@ public abstract class AgentScope extends Scope {
}
@Override
public void newMessage(String routingKey, Map<String, Object> headers, String message) {
public void newMessage(String exchange, String routingKey, Map<String, Object> headers, String message) {
if (controller != null) {
ModelNotification notification = new ModelNotification(getModelRepository(), headers);
EObject instance = notification.getInstance();
String[] destinations = controller.getDelegationDestinations(instance);
String[] destinations = controller.getDelegationDestinations(exchange, routingKey, instance);
for (String curScopeName : destinations) {
AgentScope curScope = scopeCache.get(curScopeName);
if (curScope == null) {
......@@ -310,7 +310,7 @@ public abstract class AgentScope extends Scope {
getMessageBus().bindQueue(queueName, exchange, routingKey);
getMessageBus().addMessageListener(queueName, new Listener() {
@Override
public void newMessage(String routingKey, Map<String, Object> headers, String message) {
public void newMessage(String exchange, String routingKey, Map<String, Object> headers, String message) {
agent.handleNotification(new ModelNotification(getModelRepository(), headers));
}
});
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment