読者です 読者をやめる 読者になる 読者になる

.logbook

学んだことを書き綴る、言わば航海日誌です。

mruby-mqttを使ってクリーン・セッション(Clean Session)を試す

MQTT mruby

以前の記事*1*2にてMQTTブローカ、クライアントを作成し、動作確認を実施した。本記事ではこれらを使用してMQTTのクリーン・セッションの動作を確認する過程を記録する。

クリーン・セッションとは

MQTTにおけるクリーン・セッションの仕様はIBMのサイト内に記述されている。

IBM Knowledge Center

簡単に説明すると、MQTTクライアントとサーバ間の接続が切れた場合において、切断前の情報(設定したtopic等)を「消去する」か「残しておく」かを選択可能な機能である。クリーン・セッションをONすると、ブローカが保持しているクライアントの情報は切断時に消去される。

クリーン・セッションがOFFの場合、ブローカが保持している情報は削除されない。よって、再接続時に再度subscribeすることなく、メッセージを受信できる。また、subscribeのQoSを1以上としておけば、切断中にpublishされたメッセージを再接続時に受信可能である。

f:id:ylgbk:20150412153242p:plain

f:id:ylgbk:20150412153249p:plain

f:id:ylgbk:20150412153256p:plain

クリーン・セッションの動作を確認する

受信側クライアントと送信側クライアントのプログラムを作成した。

subscribe.rb(受信側)

MQTTブローカへ接続後すぐに切断し、その後再接続するプログラムである。

初回接続時にsbscribeしているものの、2回目の接続時には行っていない。クリーン・セッションを無効化しているため、MQTTブローカには初回接続時のsubscribe情報が残っている。

# 再接続フラグ
start_retry = false

# MQTTブローカへconnectする
client = MQTTClient.connect("192.168.11.150:1883", "client_a") do |c|
  # クリーン・セッションを無効化する
  c.clean_session = false

  # connectに成功したらMQTTブローカへsubscribeする
  c.on_connect = -> {
    puts "connect."
    c.subscribe("/programming/ruby")
    # 一旦切断する
    c.disconnect
  }
  # disconnectに成功したらメッセージ出力する
  c.on_disconnect = -> {
    puts "disconnect."

    # 再接続フラグをON
    start_retry = true
  }
end

# 再接続を待つ
while(!start_retry) do
  sleep 1
end

# 再接続する
client = MQTTClient.connect("192.168.11.150:1883", "client_a") do |c|
  # メッセージ受信したら表示する
  # (コールバックとして登録)
  c.on_message = -> (message) {
    puts "#{message.topic}: #{message.payload}"
  }
  # connectに成功したらメッセージ出力する
  c.on_connect = -> {
    puts "reconnect."
  }
end

# イベントを待つ
while(true) do
  sleep 5
end

publish.rb(送信側)

QoS0~2のメッセージをpublishするプログラムである。テストも兼ねて、別のtopic(/programming/java)のメッセージもpublishする。

# MQTTブローカへ接続
client = MQTTClient.connect("192.168.11.150:1883", "client_b") do |c|
  # クリーン・セッションを無効化する
  c.clean_session = false

  # 接続したらメッセージを配信する
  c.on_connect   = -> {
    c.publish("/programming/ruby", "QoS0 message on /programming/ruby.", qos:0)
    c.publish("/programming/ruby", "QoS1 message on /programming/ruby.", qos:1)
    c.publish("/programming/ruby", "QoS2 message on /programming/ruby.", qos:2)
    c.publish("/programming/java", "QoS2 message on /programming/java.", qos:2)
  }
end

# イベントを待つ
while(true) do
  sleep 5
end

確認結果

実際に動作確認してみる。

$ mruby subscribe.rb 
connect.
disconnect.
reconnect.
(このタイミングで別Terminalを起動、「$ mruby publish.rb」を実行する)
/programming/ruby: QoS0 message on /programming/ruby.
/programming/ruby: QoS1 message on /programming/ruby.
/programming/ruby: QoS2 message on /programming/ruby.

このようにクリーン・セッションがOFFであれば、受信側が切断→再接続した場合に再度subscribeする必要は無い。

念のため、「subscribe.rb」の「c.clean_session = false」を「c.clean_session = true」に変更してみた。

$ mruby subscribe.rb 
connect.
disconnect.
reconnect.
(このタイミングで別Terminalを起動、「$ mruby publish.rb」を実行する)
(何も出力されない)

クリーン・セッションがONである場合、切断時に全ての情報が削除される。初回接続時のtopic情報がブローカに残っておらず、メッセージは受信しない。

受信側が切断中にpublishされたメッセージ

クリーン・セッションがOFFの場合、QoSを1以上としておけば切断中にpublishされたメッセージを再接続時に受信可能である。

これを確認すべく受信側プログラムを改造した。

# 再接続フラグ
start_retry = false

# mqttブローカへconnectする
client = mqttclient.connect("192.168.11.150:1883", "client_a") do |c|
  # クリーン・セッションを無効化する
  c.clean_session = false

  # connectに成功したらmqttブローカへsubscribeする
  c.on_connect = -> {
    puts "connect."
    c.subscribe("/programming/ruby", qos:1)
    # 一旦切断する
    c.disconnect
  }
  # disconnectに成功したらメッセージ出力する
  c.on_disconnect = -> {
    puts "disconnect."

    # 再接続フラグをon
    start_retry = true
  }
end

# 再接続を待つ
while(!start_retry) do
  sleep 1
end

# 10秒待つ
sleep 10  # この間に別プログラムにてpublishすること

# 再接続する
client = mqttclient.connect("192.168.11.150:1883", "client_a") do |c|
  # メッセージ受信したら表示する
  # (コールバックとして登録)
  c.on_message = -> (message) {
    puts "#{message.topic}: #{message.payload}"
  }
  # connectに成功したらメッセージ出力する
  c.on_connect = -> {
    puts "reconnect."
  }
end

# イベントを待つ
while(true) do
  sleep 5
end

「sleep 10」の最中に、別Terminalにてpublish.rbを実行した。

$ mruby subscribe.rb 
connect.
disconnect.
reconnect.
/programming/ruby: QoS1 message on /programming/ruby.
/programming/ruby: QoS2 message on /programming/ruby.

このように切断中にpublishされたメッセージのうち、QoS1以上のものを再接続時に受信できた。