Skip to content
Snippets Groups Projects
Commit f0aaae80 authored by Simon Spinner's avatar Simon Spinner
Browse files

Add message bus abstraction.

parent be4e1342
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,9 @@ Export-Package: tools.descartes.prisma.core,
tools.descartes.prisma.core.impl,
tools.descartes.prisma.core.scopes,
tools.descartes.prisma.core.util
Import-Package: org.apache.commons.logging,
Import-Package: com.rabbitmq.client,
com.rabbitmq.client.impl;version="3.6.1",
org.apache.commons.logging,
org.eclipse.emf.cdo.net4j,
org.eclipse.net4j,
org.eclipse.net4j.connector,
......
package tools.descartes.prisma.core;
import java.util.Map;
public interface MessageBus {
public interface Listener {
void newMessage(Map<String, Object> headers, String message);
}
public enum ExchangeType {
DIRECT, FANOUT, TOPIC, HEADERS
}
void connect();
void disconnect();
void createQueue(String queue);
void createExchange(String exchange, ExchangeType type);
void bindQueue(String queue, String exchange, String routingKey);
void addMessageListener(String queue, final Listener listener);
void publishMessage(String exchange, String routingKey, String message, Map<String, Object> headers);
}
package tools.descartes.prisma.core.exceptions;
public class MessagingException extends RuntimeException {
public MessagingException(Throwable cause) {
super(cause);
}
}
package tools.descartes.prisma.core.impl;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import tools.descartes.prisma.core.MessageBus;
import tools.descartes.prisma.core.exceptions.MessagingException;
public class RabbitMQMessageBus implements MessageBus {
private final Log log = LogFactory.getLog(RabbitMQMessageBus.class);
private final String userName;
private final String password;
private final String virtualHost;
private final String host;
private final int port;
private Connection connection;
private Channel channel;
public RabbitMQMessageBus(String userName, String password, String virtualHost, String host, int port) {
super();
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.host = host;
this.port = port;
}
@Override
public void connect() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
log.error("Error connecting to RabbitMQ server", e);
throw new MessagingException(e);
}
}
@Override
public void disconnect() {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException | TimeoutException e) {
log.error("Error disconnecting from RabbitMQ server", e);
throw new MessagingException(e);
}
}
public void createExchange(String exchange, ExchangeType type) {
try {
String typeString = "";
switch (type) {
case DIRECT:
typeString = "direct";
break;
case FANOUT:
typeString = "fanout";
break;
case TOPIC:
typeString = "topic";
break;
case HEADERS:
typeString = "headers";
break;
}
channel.exchangeDeclare(exchange, typeString, false);
} catch (IOException e) {
log.error("Error creating exchange in RabbitMQ server", e);
throw new MessagingException(e);
}
}
public void createQueue(String queue) {
try {
channel.queueDeclare(queue, false, false, false, null);
} catch (IOException e) {
log.error("Error creating queue in RabbitMQ server", e);
throw new MessagingException(e);
}
}
public void bindQueue(String queue, String exchange, String routingKey) {
try {
channel.queueBind(queue, exchange, routingKey);
} catch (IOException e) {
log.error("Error creating queue in RabbitMQ server", e);
throw new MessagingException(e);
}
}
public void publishMessage(String exchange, String routingKey, String message, Map<String, Object> headers) {
byte[] bodyBytes = message.getBytes();
try {
channel.basicPublish(exchange, routingKey, new AMQP.BasicProperties.Builder().headers(headers).build(),
bodyBytes);
} catch (IOException e) {
log.error("Error publishing message to RabbitMQ server", e);
throw new MessagingException(e);
}
}
public void addMessageListener(String queue, final Listener listener) {
try {
boolean autoAck = false;
channel.basicConsume(queue, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
listener.newMessage(properties.getHeaders(), new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
} catch (IOException e) {
log.error("Error registering consumer at RabbitMQ server", e);
throw new MessagingException(e);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment