Getting Started
Dependencies
The below dependency is the bare minimum one that will be needed with any project. The remaining depends on the type of the project. The below dependency add the core layer of HMF4J to your project
<dependency>
<groupId>io.github.handy-messaging-framework</groupId>
<artifactId>hmf4j-core</artifactId>
<version>1.1.0</version>
</dependency>
Quick Example
Example here illustrates how to use the HMF4J framework to send and receive messages from the Apache Kafka messaging system.
Pre-requisites for the example
- Kafka ver 2.13 running
- Create a topic named
demo_topic
in Kafka
Apart from the dependencies listed above, include the following dependecies to your pom.xml
file.
<dependency>
<groupId>io.github.handy-messaging-framework</groupId>
<artifactId>hmf4j-kafka-connector</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>io.github.handy-messaging-framework</groupId>
<artifactId>hmf4j-types-simplemessage</artifactId>
<version>1.1.0</version>
</dependency>
The above dependencies add the following capabilities:
- handy-messaging-framework’s Kafka connector which enables the framework to interface with Kafka
- SimpleMessage - A messaging schema designed on Google’s protobuf standard. More details on this later.
Configuration file
Create a file by the name hmf4j-conf.yml
in the application’s resource
directory
The contents of the hmf4j-conf.yml
file will look as:
hmf4j:
profiles:
- profileName: profile1
system: kafka
consumer:
properties:
bootstrap.servers: localhost:9092
group.id: test_app
max.messages.per.batch: 3
max.poll.duration.millis: 10000
producer:
properties:
bootstrap.servers: localhost:9092
In the configuration we have defined a profile called profile1
which uses kafka
as the messaging system. Its producer and consumer properties are defined below.
Lets take the case of the Producer example now. The producer in a messaging system creates and sends messages through the messaging channel. In our example the producer will be a Kafka producer.
Sending a message
public class SimpleProducer {
MessageProducerSystem producerSystem;
/**
* Initializes the producer system. Producer is asked to use `profile1` and write to `demo_topic`
*/
public void SimpleProducer() {
producerSystem = new MessageProducerSystem("profile1", "demo_topic");
}
/**
* Function sends the message generated using the producer to the Kafka `demo_topic`
*/
public void sendMsg() {
SimpleMessage contentMsg = generateMessage();
this.producerSystem.sendMessage(contentMsg);
this.producerSystem.close();
System.out.println("Message sent");
}
/**
* Function generates a message with SimpleMessage schema.
*
* @return
*/
private SimpleMessage generateMessage() {
SimpleMessage contentMsg = new SimpleMessage();
contentMsg.setContentSchema(String.class.toString());
contentMsg.setDateTime(Optional.of(Date.from(Instant.now())));
contentMsg.setMessageId("msg-1");
contentMsg.setPayload("Hello, this is a sample message".getBytes());
contentMsg.setSender("app-1");
return contentMsg;
}
}
The producerSystem
uses profile1
as the profile and writes to demo_topic
. profile1
is defined in the hmf4j-conf.yml
to use the Kafka messaging system. Run the program and verify that the data lands in the demo_topic
in Kafka. Use the protobuf compiler to deserialize and verify the data written to the Kafka topic.
Consuming message
Initialzing the Message Consumer
public class SimpleConsumer {
MessageConsumingSystem consumingSystem;
SimpleHandler messageHandler;
/**
* Type of the message expected to be received by the consumer.
* In this case it is of the type SimpleMessage
*/
String getMessageTypeClass(){
return "io.github.handy.messaging.types.simplemessage.SimpleMessage";
}
/**
* Initializes and starts the consumer. The consumer is initialized with profile1 and the demo_topic.
* The consumer will start listening from this point.
* Once a message is received in the channel, the `messageHandler` is invoked.
*/
public SimpleConsumer(){
messageHandler = new SimpleHandler();
MessageConsumingSystem.getInstance().setupConsumer("profile1",
"demo_topic",
getMessageTypeClass(), messageHandler);
}
}
The above class initializes and starts the MessageConsumingSystem. It is asked to use profile1
. This implies the consumer will connect to Kafka. The kafka topic to which it listens to will be demo_topic
.
When a message is received by the consumer it passes it over to the messageHandler defined below:
Message Handler
public class SimpleHandler implements MessageHandler {
/**
* Function that gets invoked for every message read from the channel
* @param message - Message received from the channel
*/
@Override
public void handleMessage(Message message) {
SimpleMessage msg = (SimpleMessage) message;
System.out.println(String.format("Id: %s",msg.getId()));
System.out.println(String.format("Message payload: %s", new String(msg.getId())));
}
/**
* Function decides if a new instance of the handler is needed for each of the messages received
* @return optional MessageHandler
*/
@Override
public Optional<MessageHandler> getNewInstance() {
return Optional.empty();
}
}
The SimpleHandler
class here is an extension of MessageHandler
which is a pre-requisite for registering a class as a message handler. Whenever a new message is received, the handleMessage(Message message) function gets invoked
. The getNewInstance()
function decides if a new instance of the SimpleHandler class is needed for every message received from the input channel.
Springboot Integration
The framework provides an easy way to integrate with your Springboot application. This is by using the hmf4j-springboot
artifact. The maven coordinates for this artifact is given below:
<dependency>
<groupId>io.github.handy-messaging-framework</groupId>
<artifactId>hmf4j-springboot</artifactId>
<version>1.1.0</version>
</dependency>
Here is an example of the above code using HMF4J’s Springboot Integration.
The configuration file - hmf4j-conf.yml
remains the same as above.
Springboot Main Class
@ComponentScan(basePackages = {"io.github.handy.messaging", "org.example"})
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
The main
class has explicit scanning for the package prefix io.github.handy.messaging
. This enables bootstrapping the hmf4j-conf.yaml
Sending Messages
The first step towards sending a message is to register the Producer bean. This can be done as follows:
@Component
public class SampleProducer extends MessageProducerFoundation {
@Override
protected String getQueueName() {
return "test_topic";
}
@Override
protected String getProfile() {
return "profile1";
}
public SampleProducer() throws InterruptedException {
super();
}
}
- The class
SampleProducer
is inhertingMessageProducerFoundation
. TheMessageProducerFoundation
class performs all the wiring that is needed to setup a producer. - As seen above, the
getProfile()
function tells the producer which profile from thehmf4j-conf.yml
has to be used for setting up the producer. In this case we are usingprofile1
, which per the configuration uses thesystem
askafka
. This means that theSampleProducer
wired up to send messages to a Kafka messaging system.
The SampleProducer
is configured to write to the topic test_topic
per the getQueueName()
function
Next step is to use the above producer to send a message. The below code illustrates that:
@Component
public class ProducerExec {
public ProducerExec(SampleProducer producer){
SimpleMessage contentMsg = new SimpleMessage();
contentMsg.setContentSchema(String.class.toString());
contentMsg.setDateTime(Optional.of(Date.from(Instant.now())));
contentMsg.setMessageId("msg-1");
contentMsg.setPayload("Hello, this is a sample message".getBytes());
contentMsg.setSender("app-1");
producer.sendMessage(contentMsg);
System.out.println("Message sent");
}
}
The above ProducerExec
class is a Spring Bean that while getting bootstrapped uses the SampleProducer
that has been defined before to send a message to the Kafka