ํ๋ก์ ํธ ๊ฐ์
์ฑ <-> ์๋ฒ <-> ๋๋ฐ์ด์ค๋ก ๊ตฌ์ฑ๋ ํ๋ก์ ํธ๋ฅผ ์ํํ๋ ์ค ๋ค์์ ์ฌ์ฉ์๊ฐ ์ฑ์์ ๋ฐํํ๋ ์ด๋ฒคํธ๋ฅผ ๋๋ฐ์ด์ค์์ ๋น ๋ฅด๊ณ ๊ฐ๋ณ๊ฒ ๋ฐ์ ์ ์๋๋ก ํ๊ธฐ ์ํด MQTT๋ฅผ ๋์ ํ์๋ค.
- Spring ์๋ฒ์ MQTT ๋ธ๋ก์ปค ์ค์น
- Mosquitto, EMQX, HiveMQ ๋ฑ MQTT ๋ธ๋ก์ปค๋ฅผ ์๋ฒ์ ์ค์น.
- ํฌํธ(๋ณดํต 1883, ๋๋ TLS์ผ ๊ฒฝ์ฐ 8883) ์ด๊ธฐ.
- MQTT ๋ธ๋ก์ปค๊ฐ ์ธ๋ถ(๋๋ฐ์ด์ค)์์ ์ ๊ทผ ๊ฐ๋ฅํด์ผ ํ๋ฏ๋ก ๋ฐฉํ๋ฒฝ/๋ณด์ ๊ทธ๋ฃน ์ค์ ํ์.
- ๋๋ฐ์ด์ค์์ MQTT ํด๋ผ์ด์ธํธ ๊ตฌํ
- ๋๋ฐ์ด์ค์์ MQTT ํด๋ผ์ด์ธํธ๋ฅผ ์คํ์์ผ์ ์๋ฒ์ ๋ธ๋ก์ปค์ ์ฐ๊ฒฐ.
- ์: mqtt://your.server.ip:1883
- ํ ํฝ์ ๊ตฌ๋ ํ์ฌ ์๋ฒ์์ publishํ๋ ๋ฉ์์ง๋ฅผ ๋ฐ์ ์ ์๊ฒ ํจ.
- Spring ์๋ฒ์์ MQTT ๋ฉ์์ง ๋ฐํ
- ๋๋ฐ์ด์ค๊ฐ ๊ตฌ๋ ์ค์ธ ํ ํฝ์ผ๋ก ๋ฉ์์ง๋ฅผ ๋ฐํ (publish).
- Spring์์๋ Eclipse Paho ํด๋ผ์ด์ธํธ๋ฅผ ์ด์ฉํ๊ฑฐ๋, MQTT ์ง์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ(e.g., spring-integration-mqtt)๋ฅผ ์ด์ฉ.
1. Mac ํ๊ฒฝ์์ Mosquitto ์ค์น
์ค์น
brew install mosquitto
์คํ
brew services start mosquitto
conf๋ก ์คํ
/opt/homebrew/opt/mosquitto/sbin/mosquitto -c /opt/homebrew/etc/mosquitto/mosquitto.conf
2. Spring Integration MQTT
https://docs.spring.io/spring-integration/reference/mqtt.html#mqtt-outbound
MQTT Support :: Spring Integration
If a single MQTT ClientID is required for several integrations, multiple MQTT client instances cannot be used because MQTT brokers may have a limitation on a number of connections per ClientID (typically, a single connection is allowed). For having a singl
docs.spring.io
MqttConnectOptions
https://eclipse.dev/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html
MqttConnectOptions
Set a list of one or more serverURIs the client may connect to. Each serverURI specifies the address of a server that the client may connect to. Two types of connection are supported tcp:// for a TCP connection and ssl:// for a TCP connection secured by SS
eclipse.dev
build.gradle
// Spring Integration
implementation 'org.springframework.boot:spring-boot-starter-integration'
// Spring MQTT
implementation "org.springframework.integration:spring-integration-mqtt:6.4.3"
configuration
@Configuration
public class MqttOutboundConfig {
@Value("${mqtt.url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.default-topic}")
private String defaultTopic;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return IntegrationFlow.from("mqttOutboundChannel")
.handle(mqttOutbound())
.get();
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
1. ๋ณ์
@Value("${mqtt.url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.default-topic}")
private String defaultTopic;
brokerUrl : MQTT ๋ธ๋ก์ปค๊ฐ ์คํ๋๊ณ ์๋ ์ฃผ์/ํฌํธ๋ฅผ ์๋ฏธํ๋ค.
clientId : MQTT ๋ธ๋ก์ปค์ ์ ์ํ ๊ฒฝ์ฐ ClientId๋ฅผ ๊ธฐ์ ํด์ผ ํ๋ค. ์ค๋ณต๋ Id๋ก๋ ์ ์์ด ๋ถ๊ฐํ๋ค.
2. mqttClientFactory
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
Factory : ๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด ์ฃผ๋ ์ญํ ์ ํ๋ ์ผ์ข ์ ์์ฐ์ ํด๋์ค.
Spring Integration์์๋ MQTT๋ฅผ ๋ค๋ฃฐ๋ ๋ด๋ถ์ ์ผ๋ก Paho ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ฏ๋ก Paho Client๋ฅผ ์์ฑํ๋ ค๋ฉด ์ค์ ์ด ํ์ํจ.
MqttConnectOptions -> ์ค์ . ์ค์ ์ ๋ด์๋๋ ์์ฐ์ -> Factory
๊ตฌ์ฒด์ ์ธ ์ต์ ์ ๊ดํ๊ฑด ์์ ๋งํฌ๋ฅผ ์ฐธ์กฐํ๋ฉด ๋๋ค.
3. mqttOutbound(Handler)
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
MQTT ๋ฉ์์ง๋ฅผ Mosquitto์ ๋ณด๋ด๋ ์ญํ ์ ํ๋ ํธ๋ค๋ฌ.
Integration Flow์์ .handle(mqttOutbound())๋ก ์ฌ์ฉ๋๋ค.
4. mqttOutboundFlow(Integration Flow)
@Bean
public IntegrationFlow mqttOutboundFlow() {
return IntegrationFlow.from("mqttOutboundChannel")
.handle(mqttOutbound())
.get();
}
mqttOutboundChannel์ ๋ํ IntegrationFlow๋ฅผ ๊ด๋ฆฌํ๋ค.
service
@Service
@RequiredArgsConstructor
public class MqttPublisherService {
private final MessageChannel mqttOutboundChannel;
public void publish(String topic, String payload) {
Message<String> message = MessageBuilder.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.build();
mqttOutboundChannel.send(message);
}
}
mqttOutboundChannel์ Spring Integration ๋ด๋ถ์์ ์ฌ์ฉ๋๋ MessageChannel์ ์ด๋ฆ์ด๋ค. Spring์ด ์ด ์ด๋ฆ์ผ๋ก ๋ฉ์์ง ๋ผ์ฐํ ์ ํ ์ ์๋๋ก ํ๋ค.
QOS
MQTT์์ ๋ฉ์์ง ์ ์ก ํ์ง ์์ค์ ์ ํ๋ ์ค์ ์ผ๋ก
0 : At most once (์ต๋ ํ๋ฒ) -> ์คํจํด๋ ์ฌ์ ์ก X
1 : At least once (์ต์ํ๋ฒ) -> ์ฑ๊ณตํ ๋๊น์ง ์ฌ์ ์ก (์ค๋ณต๊ฐ๋ฅ)
2 : Exactly once (์ ํํ ํ๋ฒ) -> ๊ฐ์ฅ ์์ ํ์ง๋ง ๋๋ฆผ.
public void publish(String topic, String payload) {
Message<String> message = MessageBuilder.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, 0)
.build();
mqttOutboundChannel.send(message);
}
Spring ์ฌ์ฉ์ ์ด ๋ถ๋ถ์ ๋ค์ด๊ฐ๊ฒ ๋๋ค.