๊ฐœ๋ฐœ

[Server] Mac ํ™˜๊ฒฝ ๋กœ์ปฌ๋กœ Mosquitto, Spring Integration MQTT ์‚ฌ์šฉ

rkawk 2025. 4. 1. 18:54

ํ”„๋กœ์ ํŠธ ๊ฐœ์š”

์•ฑ <-> ์„œ๋ฒ„ <-> ๋””๋ฐ”์ด์Šค๋กœ ๊ตฌ์„ฑ๋œ ํ”„๋กœ์ ํŠธ๋ฅผ ์ˆ˜ํ–‰ํ•˜๋˜ ์ค‘ ๋‹ค์ˆ˜์˜ ์‚ฌ์šฉ์ž๊ฐ€ ์•ฑ์—์„œ ๋ฐœํ–‰ํ•˜๋Š” ์ด๋ฒคํŠธ๋ฅผ ๋””๋ฐ”์ด์Šค์—์„œ ๋น ๋ฅด๊ณ  ๊ฐ€๋ณ๊ฒŒ ๋ฐ›์„ ์ˆ˜ ์žˆ๋„๋ก ํ•˜๊ธฐ ์œ„ํ•ด MQTT๋ฅผ ๋„์ž…ํ•˜์˜€๋‹ค.

 

 

 

  • Spring ์„œ๋ฒ„์— MQTT ๋ธŒ๋กœ์ปค ์„ค์น˜
    • Mosquitto, EMQX, HiveMQ ๋“ฑ MQTT ๋ธŒ๋กœ์ปค๋ฅผ ์„œ๋ฒ„์— ์„ค์น˜.
    • ํฌํŠธ(๋ณดํ†ต 1883, ๋˜๋Š” TLS์ผ ๊ฒฝ์šฐ 8883) ์—ด๊ธฐ.
    • MQTT ๋ธŒ๋กœ์ปค๊ฐ€ ์™ธ๋ถ€(๋””๋ฐ”์ด์Šค)์—์„œ ์ ‘๊ทผ ๊ฐ€๋Šฅํ•ด์•ผ ํ•˜๋ฏ€๋กœ ๋ฐฉํ™”๋ฒฝ/๋ณด์•ˆ ๊ทธ๋ฃน ์„ค์ • ํ•„์š”.
  • ๋””๋ฐ”์ด์Šค์—์„œ MQTT ํด๋ผ์ด์–ธํŠธ ๊ตฌํ˜„
    • ๋””๋ฐ”์ด์Šค์—์„œ MQTT ํด๋ผ์ด์–ธํŠธ๋ฅผ ์‹คํ–‰์‹œ์ผœ์„œ ์„œ๋ฒ„์˜ ๋ธŒ๋กœ์ปค์— ์—ฐ๊ฒฐ.
    • ์˜ˆ: mqtt://your.server.ip:1883
    • ํ† ํ”ฝ์„ ๊ตฌ๋…ํ•˜์—ฌ ์„œ๋ฒ„์—์„œ publishํ•˜๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ๊ฒŒ ํ•จ.
  • Spring ์„œ๋ฒ„์—์„œ MQTT ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰
    • ๋””๋ฐ”์ด์Šค๊ฐ€ ๊ตฌ๋… ์ค‘์ธ ํ† ํ”ฝ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ (publish).
    • Spring์—์„œ๋Š” Eclipse Paho ํด๋ผ์ด์–ธํŠธ๋ฅผ ์ด์šฉํ•˜๊ฑฐ๋‚˜, MQTT ์ง€์› ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ(e.g., spring-integration-mqtt)๋ฅผ ์ด์šฉ.

 

1. Mac ํ™˜๊ฒฝ์—์„œ Mosquitto ์„ค์น˜

์„ค์น˜

brew install mosquitto

 

 

์‹คํ–‰

brew services start mosquitto

 

conf๋กœ ์‹คํ–‰

/opt/homebrew/opt/mosquitto/sbin/mosquitto -c /opt/homebrew/etc/mosquitto/mosquitto.conf

 

 

2. Spring Integration MQTT 

 

https://docs.spring.io/spring-integration/reference/mqtt.html#mqtt-outbound

 

MQTT Support :: Spring Integration

If a single MQTT ClientID is required for several integrations, multiple MQTT client instances cannot be used because MQTT brokers may have a limitation on a number of connections per ClientID (typically, a single connection is allowed). For having a singl

docs.spring.io

MqttConnectOptions

https://eclipse.dev/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html

 

MqttConnectOptions

Set a list of one or more serverURIs the client may connect to. Each serverURI specifies the address of a server that the client may connect to. Two types of connection are supported tcp:// for a TCP connection and ssl:// for a TCP connection secured by SS

eclipse.dev

 

 

 

build.gradle

// Spring Integration
implementation 'org.springframework.boot:spring-boot-starter-integration'

// Spring MQTT
implementation "org.springframework.integration:spring-integration-mqtt:6.4.3"

 

configuration

@Configuration
public class MqttOutboundConfig {

    @Value("${mqtt.url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setCleanSession(true);
        factory.setConnectionOptions(options);

        return factory;
    }

    @Bean
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }

    @Bean
    public IntegrationFlow mqttOutboundFlow() {
        return IntegrationFlow.from("mqttOutboundChannel")
                .handle(mqttOutbound())
                .get();
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


}

1. ๋ณ€์ˆ˜

@Value("${mqtt.url}")
private String brokerUrl;

@Value("${mqtt.client-id}")
private String clientId;

@Value("${mqtt.default-topic}")
private String defaultTopic;

brokerUrl : MQTT ๋ธŒ๋กœ์ปค๊ฐ€ ์‹คํ–‰๋˜๊ณ  ์žˆ๋Š” ์ฃผ์†Œ/ํฌํŠธ๋ฅผ ์˜๋ฏธํ•œ๋‹ค.

clientId : MQTT ๋ธŒ๋กœ์ปค์— ์ ‘์†ํ•  ๊ฒฝ์šฐ ClientId๋ฅผ ๊ธฐ์ž…ํ•ด์•ผ ํ•œ๋‹ค. ์ค‘๋ณต๋œ Id๋กœ๋Š” ์ ‘์†์ด ๋ถˆ๊ฐ€ํ•˜๋‹ค.

 

2. mqttClientFactory

 @Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[]{brokerUrl});
    options.setCleanSession(true);
    factory.setConnectionOptions(options);

    return factory;
}

 

 

Factory : ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค์–ด ์ฃผ๋Š” ์—ญํ• ์„ ํ•˜๋Š” ์ผ์ข…์˜ ์ƒ์‚ฐ์ž ํด๋ž˜์Šค.

Spring Integration์—์„œ๋Š” MQTT๋ฅผ ๋‹ค๋ฃฐ๋•Œ ๋‚ด๋ถ€์ ์œผ๋กœ Paho ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ Paho Client๋ฅผ ์ƒ์„ฑํ•˜๋ ค๋ฉด ์„ค์ •์ด ํ•„์š”ํ•จ.

MqttConnectOptions -> ์„ค์ •. ์„ค์ •์„ ๋‹ด์•„๋†“๋Š” ์ƒ์‚ฐ์ž -> Factory

 

๊ตฌ์ฒด์ ์ธ ์˜ต์…˜์— ๊ด€ํ•œ๊ฑด ์œ„์˜ ๋งํฌ๋ฅผ ์ฐธ์กฐํ•˜๋ฉด ๋œ๋‹ค.

 

3. mqttOutbound(Handler)

@Bean
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler =
            new MqttPahoMessageHandler(clientId, mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(defaultTopic);
    return messageHandler;
}

MQTT ๋ฉ”์‹œ์ง€๋ฅผ Mosquitto์— ๋ณด๋‚ด๋Š” ์—ญํ• ์„ ํ•˜๋Š” ํ•ธ๋“ค๋Ÿฌ.

Integration Flow์—์„œ .handle(mqttOutbound())๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

 

4. mqttOutboundFlow(Integration Flow)

@Bean
public IntegrationFlow mqttOutboundFlow() {
    return IntegrationFlow.from("mqttOutboundChannel")
            .handle(mqttOutbound())
            .get();
}

mqttOutboundChannel์— ๋Œ€ํ•œ IntegrationFlow๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค.

 

service

@Service
@RequiredArgsConstructor
public class MqttPublisherService {

    private final MessageChannel mqttOutboundChannel;

    public void publish(String topic, String payload) {
        Message<String> message = MessageBuilder.withPayload(payload)
                .setHeader(MqttHeaders.TOPIC, topic)
                .build();
        mqttOutboundChannel.send(message);
    }
}

 

 

mqttOutboundChannel์€ Spring Integration ๋‚ด๋ถ€์—์„œ ์‚ฌ์šฉ๋˜๋Š” MessageChannel์˜ ์ด๋ฆ„์ด๋‹ค. Spring์ด ์ด ์ด๋ฆ„์œผ๋กœ ๋ฉ”์‹œ์ง€ ๋ผ์šฐํŒ…์„ ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค.

 

QOS

MQTT์—์„œ ๋ฉ”์‹œ์ง€ ์ „์†ก ํ’ˆ์งˆ ์ˆ˜์ค€์„ ์ •ํ•˜๋Š” ์„ค์ •์œผ๋กœ

0 : At most once (์ตœ๋Œ€ ํ•œ๋ฒˆ) -> ์‹คํŒจํ•ด๋„ ์žฌ์ „์†ก X

1 : At least once (์ตœ์†Œํ•œ๋ฒˆ) -> ์„ฑ๊ณตํ•  ๋•Œ๊นŒ์ง€ ์žฌ์ „์†ก (์ค‘๋ณต๊ฐ€๋Šฅ)

2 : Exactly once (์ •ํ™•ํžˆ ํ•œ๋ฒˆ) -> ๊ฐ€์žฅ ์•ˆ์ „ํ•˜์ง€๋งŒ ๋А๋ฆผ.

 

public void publish(String topic, String payload) {
    Message<String> message = MessageBuilder.withPayload(payload)
            .setHeader(MqttHeaders.TOPIC, topic)
            .setHeader(MqttHeaders.QOS, 0)
            .build();
    mqttOutboundChannel.send(message);
}

Spring ์‚ฌ์šฉ์‹œ ์ด ๋ถ€๋ถ„์— ๋“ค์–ด๊ฐ€๊ฒŒ ๋œ๋‹ค.