Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>RabbitMQInActionExampleCode</groupId>
<artifactId>RabbitMQInActionExampleCode</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>appendix-a</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source />
<target />
</configuration>
</plugin>
</plugins>

</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<!-- Not necessary, used for handle String manipulation mainly -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>


<!-- Json support -->
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20171018</version>
</dependency>



</dependencies>
</project>
21 changes: 21 additions & 0 deletions java/src/com/SampleCode/ch2/BasicConfirmListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.SampleCode.ch2;

import java.io.IOException;

import com.rabbitmq.client.ConfirmListener;

public class BasicConfirmListener implements ConfirmListener {

@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(" - (BasicConfirmListener); handle ack msg for: " + deliveryTag);

}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println(" - (BasicConfirmListener); handle NNNack msg for: " + deliveryTag);

}

}
39 changes: 39 additions & 0 deletions java/src/com/SampleCode/ch2/BasicConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.SampleCode.ch2;

import java.io.IOException;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


/**
* Consumer used within ch2 example which simply print out the msg with prefix && Meta.
* @author andykwok
*
*/
public class BasicConsumer extends DefaultConsumer{

public BasicConsumer(Channel channel) {
super(channel);
}


public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String contentType = properties.getContentType();
Map<String, Object> header = properties.getHeaders();
long deliveryTag = envelope.getDeliveryTag();


String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
System.out.println(" - ContentType: " + contentType);
System.out.println(" - Header: " + header);
System.out.println(" - DeliveryTag: " + deliveryTag);
}

}
65 changes: 65 additions & 0 deletions java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.SampleCode.ch2;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

import com.SampleCode.util.LocalConnFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
import com.rabbitmq.client.AMQP.Queue;

/**
* Java version of Listing2.1 Example (Hello World consumer)
* P.31
* @author andykwok
*
*/
public class Listing2_1_HelloWorldConsumer {
public static void main(String[] args) {



//Exchange config
String exName = "Hello-exchange";
String exType = "direct";


//Queue
String qName = "hello-queue";
String routing_key = "hola";


//Construct a factory upon above parameters
LocalConnFactory factory = new LocalConnFactory();
try(Connection conn = factory.newConnection();
Channel ch = conn.createChannel();) {

//Boolean: durable || autoDelete || internal (Passive)?
if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) {
if (ch.queueDeclare(qName, true, false, true, null) != null) {
if (ch.queueBind(qName, exName, routing_key) != null) {

System.out.println(" - Fetch msg once per second, repeat for 100 times");
//Reuse consumer instance
BasicConsumer consumer = new BasicConsumer(ch);
for (int i=0 ; i<100 ; i++) {
System.out.println("/////////////Consumer loop[" + (i+1) + "]");
//ch.basicConsume(qName, consumer);
ch.basicConsume(qName, true, consumer);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {System.err.println("Thread sleep error"); }
}
}else {System.err.println("Bind Queue Error!");}
}else {System.err.println("Error, queue[" + qName + "] can't not be create!");}
}else {System.err.println("Error, can't declare exchange <Hello-exchange>!");}
} catch (IOException | TimeoutException e) {
System.err.println("Exception happen, triggered autocloseable to shut down connection, bye~");}
}
}
80 changes: 80 additions & 0 deletions java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.SampleCode.ch2;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.SampleCode.util.LocalConnFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Java version of Listing2.1 Example (Hello World producer)
* P.29
*
* Noted that as queue is not specificed in this case yet, all msg publish from
* this metho wouldn't be received by {@link Listing2_1_HelloWorldConsumer} which
* have specify "hello-queue" as its queue to use.
* @author andykwok
*
*/
public class Listing2_1_HelloWorldProducer {


public static void main(String[] args) {


//Exchange config
String exName = "Hello-exchange";
String exType = "direct";

//Queue
String qName = "hello-queue";

String routing_key = "hola";
String payload = "Hello RabbitMQ";
//Properties
BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.build();

//Construct a factory upon above parameters
LocalConnFactory factory = new LocalConnFactory();

try(Connection conn = factory.newConnection();
/*
* No com.rabbitmq.client.AMQP,
* No com.rabbitmq.client.impl.AMQPImpl
* But com.rabbitmq.client.Channel
*/
Channel ch = conn.createChannel();) {

//Passive == internal???
if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) {
//Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable.
if (ch.queueDeclare(qName, true, false, true, null) != null) {
if (ch.queueBind(qName, exName, routing_key) != null ) {

System.out.println(" - Everything set, start to consume once per second for 50 times: ");
for (int i=0 ; i<50 ; i++) {
try {
Thread.sleep(1000);
ch.basicPublish(exName, routing_key, properties, (payload + "[" + i + "]") .getBytes());
System.out.println(" - Publish msg: " + payload + "[" + i + "]");
} catch (InterruptedException e) {e.printStackTrace();}
}
}else {System.err.println("Queue declare Error!"); }
}else {System.err.println("Declare Queue error!");}
}else {System.err.println("Error, can't declare exchange <Hello-exchange>!");}
} catch (IOException | TimeoutException e) {System.err.println("Exception happen, triggered autocloseable bye~");}


}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.SampleCode.ch2;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.SampleCode.util.LocalConnFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Java version of Listing2.1 Example (Hello World producer)
* P.29
*
* Noted that as queue is not specificed in this case yet, all msg publish from
* this metho wouldn't be received by {@link Listing2_1_HelloWorldConsumer} which
* have specify "hello-queue" as its queue to use.
* @author andykwok
*
*/
public class Listing2_3_HelloWorldProducerWithConfirm {


public static void main(String[] args) {

//Exchange config
String exName = "Hello-exchange";
String exType = "direct";

//Queue
String qName = "hello-queue";

String routing_key = "hola";
String payload = "Hello RabbitMQ";
//Properties
BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.build();

//Construct a factory upon above parameters
LocalConnFactory factory = new LocalConnFactory();

try(Connection conn = factory.newConnection();
/*
* No com.rabbitmq.client.AMQP,
* No com.rabbitmq.client.impl.AMQPImpl
* But com.rabbitmq.client.Channel
*/
Channel ch = conn.createChannel();) {

//Passive == internal???
if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) {
//Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable.
if (ch.queueDeclare(qName, true, false, true, null) != null) {
if (ch.queueBind(qName, exName, routing_key) != null ) {
//Confirm listener
ch.addConfirmListener(new BasicConfirmListener());
//Enable confirm support
ch.confirmSelect();
/**
* However in automatic mode, all msg would be consider send out succcessfully
* after basicPublish (No checking).
*/
ch.basicPublish(exName, routing_key, properties, payload.getBytes());
System.out.println(" - Publish msg: " + payload);
//ConfirmListener take time to react which is a separate thread, so
//put main thread on sleep for 5s in order to wait.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}else {System.err.println("Queue bind erroe!"); }
}else {System.err.println("Queue declare error!");}
}else {System.err.println("Error, can't declare exchange <Hello-exchange>!");}



} catch (IOException | TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


}



}
12 changes: 12 additions & 0 deletions java/src/com/SampleCode/ch2/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* A basic workable pair of producer and consumer to demonstrate basic
* Rabbit usage. Listign 2_3 is simalr as 2_1, the only different is
* 2_3 would wait 5s after msg send, if the msg deliver to consumer
* with in the period of time, acknowledge stage would send back to producer
* and ConfirmListener would print out acknowledge msg.
*/
/**
* @author andykwok
*
*/
package com.SampleCode.ch2;
Loading