/__w/emqttsn/emqttsn/_build/test/cover/ct/emqttsn_state.html

1 %%-------------------------------------------------------------------------
2 %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%-------------------------------------------------------------------------
16
17 %% @doc state machine handler for MQTT-SN client
18 %%
19 %% @private
20 -module(emqttsn_state).
21 -behavior(gen_statem).
22
23 -include("emqttsn.hrl").
24 -include("logger.hrl").
25 -include_lib("stdlib/include/assert.hrl").
26
27 -export([init/1, callback_mode/0, start_link/2, handle_event/4]).
28
29
30
31 -spec callback_mode() -> gen_statem:callback_mode_result().
32 callback_mode() ->
33 36 [handle_event_function, state_enter].
34
35 -spec start_link(string(), config()) -> {ok, pid()} | {error, term()}.
36 start_link(Name, Config) ->
37 36 #config{send_port = Port} = Config,
38 36 case gen_statem:start_link({global, Name}, ?MODULE, {Name, Port, Config}, []) of
39 {'ok', Pid} ->
40 36 {ok, Pid};
41 'ignore' ->
42
:-(
?LOGP(error, "gen_statem starting process returns ignore"),
43
:-(
{error, ignore};
44 {error, Reason} ->
45
:-(
?LOGP(error, "gen_statem starting process failed, reason: ~p", [Reason]),
46
:-(
{error, Reason}
47 end.
48
49
50 -spec init({string(), inet:port_number(), config()}) -> {ok, atom(), state()} | {stop, term()}.
51 init({Name, Port, Config}) ->
52 27 emqttsn_utils:init_global_store(Name),
53 27 case emqttsn_udp:init_port(Port) of
54 {error, Reason} ->
55
:-(
?LOGP(error, "port init failed, reason: ~p", [Reason]),
56
:-(
{stop, Reason};
57 {ok, Socket} ->
58 27 {ok, initialized, #state{name = Name, socket = Socket, config = Config}}
59 end.
60
61 %-------------------------------------------------------------------------------
62 % Client is disconnected or before connect
63 %-------------------------------------------------------------------------------
64
65 %%------------------------------------------------------------------------------
66 %% @doc Find the Host of service gateway
67 %%
68 %% state : [initialized] -> [found]
69 %% trigger: enter state
70
71 %% gen_statem for state machine
72 %% @end
73 %%------------------------------------------------------------------------------
74
75 -spec handle_event(atom(), term(), atom(), state()) -> gen_statem:event_handler_result(state()).
76 handle_event(enter, _OldState, initialized,
77 State = #state{config = Config, socket = Socket}) ->
78 54 ?LOG_STATE(debug, "Find the Host of service gateway", [], State),
79 54 #config{search_gw_interval = Interval} = Config,
80 54 case emqttsn_send:broadcast_searchgw(Config, Socket, ?DEFAULT_PORT, ?DEFAULT_RADIUS) of
81 ok ->
82 54 {keep_state, State, {state_timeout, Interval, {}}};
83 {error, Reason} ->
84
:-(
?LOG_STATE(warning, "Boardcast SearchGw failed: ~p", [Reason], State),
85
:-(
{keep_state, State, {state_timeout, Interval, {}}}
86 end;
87
88 %%------------------------------------------------------------------------------
89 %% @doc Resend searchgw when reach time interval T_SEARCHGW
90 %%
91 %% state : repeat [initialized]
92 %% trigger: state timeout
93
94 %% gen_statem for state machine
95 %% T_SEARCHGW
96 %% @end
97 %%------------------------------------------------------------------------------
98
99 handle_event(state_timeout, {}, initialized, State = #state{config = Config}) ->
100 3 #config{search_gw_interval = Interval} = Config,
101 3 ?LOG_STATE(debug, "Resend searchgw when reach time interval T_SEARCHGW ~p", [Interval], State),
102 3 {repeat_state, State};
103
104 %%------------------------------------------------------------------------------
105 %% @doc Fetch gateway from received broadcast ADVERTISE packet
106 %%
107 %% state : keep [initialized]
108 %% trigger: receive advertise packet
109 %%
110 %% gen_statem for state machine
111 %% @end
112 %%------------------------------------------------------------------------------
113
114 handle_event(cast, {?ADVERTISE_PACKET(GateWayId, _Duration), Host, Port},
115 _StateName, State = #state{name = Name}) ->
116 1 ?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from received broadcast ADVERTISE packet",
117
:-(
[GateWayId, Host, Port], State),
118 1 emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = Host,
119 port = Port, from = ?BROADCAST}),
120 1 {keep_state, State};
121
122 %%------------------------------------------------------------------------------
123 %% @doc Fetch gateway from received broadcast GWINFO packet by gateway
124 %%
125 %% state : keep [initialized]
126 %% trigger: receive gwinfo packet
127 %%
128 %% gen_statem for state machine
129 %% @end
130 %%------------------------------------------------------------------------------
131
132 handle_event(cast, {?GWINFO_PACKET(GateWayId), Host, Port},
133 _StateName, State = #state{name = Name}) ->
134 2 ?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from received broadcast GWINFO packet by gateway",
135
:-(
[GateWayId, Host, Port], State),
136 2 emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = Host,
137 port = Port, from = ?BROADCAST}),
138 2 {keep_state, State};
139
140 %%------------------------------------------------------------------------------
141 %% @doc Fetch gateway from received broadcast GWINFO packet by other client
142 %%
143 %% state : keep [initialized]
144 %% trigger: receive gwinfo packet
145 %%
146 %% gen_statem for state machine
147 %% use DEFAULT_PORT = 1884, maybe have mistake
148 %% @end
149 %%------------------------------------------------------------------------------
150
151 handle_event(cast, ?GWINFO_PACKET(GateWayId, GateWayAdd),
152 _StateName, State = #state{name = Name}) ->
153 1 ?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from received broadcast GWINFO packet by client",
154
:-(
[GateWayId, GateWayAdd, ?DEFAULT_PORT], State),
155 1 emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = GateWayAdd,
156 port = ?DEFAULT_PORT,
157 from = ?PARAPHRASE}),
158 1 {keep_state, State};
159
160 %%------------------------------------------------------------------------------
161 %% @doc Request to add a new gateway
162 %%
163 %% state : keep [initialized]
164 %% trigger: manual call
165
166 %% gen_statem for state machine
167 %% @end
168 %%------------------------------------------------------------------------------
169
170 handle_event(cast, {add_gw, Host, Port, GateWayId}, _StateName,
171 State = #state{name = Name}) ->
172 19 ?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from manual add",
173
:-(
[GateWayId, Host, Port], State),
174 19 emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = Host,
175 port = Port, from = ?MANUAL}),
176 19 {keep_state, State};
177
178 %%------------------------------------------------------------------------------
179 %% @doc Request to connect a exist gateway
180 %%
181 %% state : [initialized] -> [found]
182 %% trigger: manual call
183
184 %% gen_statem for state machine
185 %% @end
186 %%------------------------------------------------------------------------------
187
188 handle_event(cast, {connect, GateWayId}, initialized,
189 State = #state{name = Name, socket = Socket}) ->
190 19 ?LOG_STATE(debug, "Request to connect a exist gateway id ~p",
191 19 [GateWayId], State),
192 19 case emqttsn_utils:get_gw(Name, GateWayId) of
193 none ->
194
:-(
{keep_state, State};
195 #gw_info{host = Host, port = Port} ->
196 19 emqttsn_udp:connect(Socket, Host, Port),
197 19 {next_state, found,
198 State#state{socket = Socket, active_gw =
199 #gw_collect{id = GateWayId, host = Host, port = Port}}}
200 end;
201
202 %-------------------------------------------------------------------------------
203 % Client connecting process
204 %-------------------------------------------------------------------------------
205
206 %%------------------------------------------------------------------------------
207 %% @doc Connect to service gateway
208 %%
209 %% state : keep [found]
210 %% trigger: enter state
211
212 %% gen_statem for state machine
213 %% @end
214 %%------------------------------------------------------------------------------
215
216 handle_event(enter, _OldState, found,
217 State = #state{config = Config, socket = Socket}) ->
218 47 ?LOG_STATE(debug, "Connect to service gateway", [], State),
219 47 #config{will = Will, clean_session = CleanSession,
220 duration = Duration, client_id = ClientId,
221 ack_timeout = AckTimeout} = Config,
222 47 emqttsn_send:send_connect(Config, Socket, Will, CleanSession, Duration, ClientId),
223 47 {keep_state, State, {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}};
224
225 %%------------------------------------------------------------------------------
226 %% @doc Found timeout to receive gateway response
227 %%
228 %% state : repeat [found]
229 %% trigger: state timeout + can resend
230 %%
231 %% state : [found] -> [connect_other]
232 %% trigger: state timeout + cannot resend
233 %%
234 %% gen_statem for state machine
235 %% @end
236 %%------------------------------------------------------------------------------
237
238 handle_event(state_timeout, {ResendTimes}, found,
239 State = #state{config = Config}) ->
240 17 #config{max_resend = MaxResend} = Config,
241 17 ?LOG_STATE(warning, "Found timeout to receive gateway response, retry: ~p/~p",
242
:-(
[ResendTimes], State),
243
244 17 if
245 ResendTimes < MaxResend ->
246 17 {repeat_state, State};
247 ResendTimes >= MaxResend ->
248
:-(
{next_state, connect_other, State}
249 end;
250
251 %%------------------------------------------------------------------------------
252 %% @doc Automatically answer for will_topic request
253 %%
254 %% state : keep [found]
255 %% trigger: receive will_topic_req packet
256
257 %% gen_statem for state machine
258 %% @end
259 %%------------------------------------------------------------------------------
260
261 handle_event(cast, ?WILLTOPICREQ_PACKET(), found,
262 State = #state{config = Config, socket = Socket}) ->
263 1 ?LOG_STATE(debug, "Automatically answer for will_topic request",
264 1 [], State),
265 1 #config{will_qos = Qos, will_topic = WillTopic} = Config,
266 1 Retain = false,
267 1 emqttsn_send:send_willtopic(Config, Socket, Qos, Retain, WillTopic),
268 1 {keep_state, State, {state_timeout, update, connect_ack}};
269
270 %%------------------------------------------------------------------------------
271 %% @doc Automatically answer for will_msg request
272 %%
273 %% state : keep [found]
274 %% trigger: receive will_msg_req packet
275
276 %% gen_statem for state machine
277 %% @end
278 %%------------------------------------------------------------------------------
279
280 handle_event(cast, ?WILLMSGREQ_PACKET(), found,
281 State = #state{config = Config, socket = Socket}) ->
282 1 ?LOG_STATE(debug, "Automatically answer for will_msg request",
283 1 [], State),
284 1 #config{will_msg = WillMsg} = Config,
285 1 emqttsn_send:send_willmsg(Config, Socket, WillMsg),
286 1 {keep_state, State, {state_timeout, update, connect_ack}};
287
288 %%------------------------------------------------------------------------------
289 %% @doc Gateway ensure connection is established
290 %%
291 %% state : [found] -> [connected]
292 %% trigger: receive connack packet and return code success
293
294 %% gen_statem for state machine
295 %% @end
296 %%------------------------------------------------------------------------------
297
298 handle_event(cast, ?CONNACK_PACKET(ReturnCode), found,
299 State = #state{config = Config})
300 when ReturnCode == ?RC_ACCEPTED ->
301 16 ?LOG_STATE(debug, "Gateway ensure connection is established", [], State),
302 16 #config{keep_alive = PingInterval} = Config,
303 16 {next_state, connected, State#state{gw_failed_cycle = 0},
304 {state_timeout, PingInterval, ping}};
305
306 %%------------------------------------------------------------------------------
307 %% @doc Connection is failed to establish
308 %%
309 %% state : keep [found]
310 %% trigger: receive connack packet and return code failed
311
312 %% gen_statem for state machine
313 %% @end
314 %%------------------------------------------------------------------------------
315
316 handle_event(cast, ?CONNACK_PACKET(ReturnCode), found,
317 State = #state{name = Name, socket = Socket, config = Config}) ->
318 1 ?LOG_STATE(error, "failed for connect response, return code: ~p",
319
:-(
[ReturnCode], State),
320 1 {next_state, initialized, #state{name = Name, socket = Socket, config = Config}};
321
322 %-------------------------------------------------------------------------------
323 % Client is waiting for response with qos resend
324 %-------------------------------------------------------------------------------
325
326 %%------------------------------------------------------------------------------
327 %% @doc Finish register request and back to connected
328 %%
329 %% state : [wait_reg] -> [connected]
330 %% trigger: receive regack packet
331
332 %% gen_statem for state machine
333 %% @end
334 %%------------------------------------------------------------------------------
335
336 handle_event(cast, ?REGACK_PACKET(TopicId, RemotePacketId, ReturnCode), wait_reg,
337 State = #state{next_packet_id = LocalPacketId,
338 waiting_data = {reg, TopicName},
339 topic_id_name = IdMap, topic_name_id = NameMap})
340 when RemotePacketId == LocalPacketId ->
341
342 6 case ReturnCode of
343 ?RC_ACCEPTED ->
344 5 ?LOG_STATE(debug, "Finish register topic id ~p and back to connected, packet id ~p",
345 5 [TopicId, RemotePacketId], State),
346 5 NewIdMap = dict:store(TopicId, TopicName, IdMap),
347 5 NewNameMap = dict:store(TopicName, TopicId, NameMap),
348 5 {next_state, connected,
349 State#state{next_packet_id = next_packet_id(RemotePacketId),
350 waiting_data = {}, topic_id_name = NewIdMap,
351 topic_name_id = NewNameMap}};
352 _ ->
353 1 ?LOG_STATE(error, "failed for register response, return code: ~p",
354
:-(
[ReturnCode], State),
355 1 {next_state, connected,
356 State#state{next_packet_id = next_packet_id(RemotePacketId),
357 waiting_data = {}, topic_id_name = IdMap,
358 topic_name_id = NameMap}}
359 end;
360
361
362 %%------------------------------------------------------------------------------
363 %% @doc Answer for register request is timeout and retry register
364 %%
365 %% state : keep [wait_reg]
366 %% trigger: state timeout + can resend
367 %%
368 %% state : [wait_reg] -> [connected]
369 %% trigger: state timeout + cannot resend
370 %%
371 %% gen_statem for state machine
372 %% @end
373 %%------------------------------------------------------------------------------
374
375 handle_event(state_timeout, {ResendTimes}, wait_reg,
376 State = #state{next_packet_id = PacketId, config = Config,
377 waiting_data = {reg, TopicName}, socket = Socket}) ->
378 6 #config{max_resend = MaxResend, resend_no_qos = WhetherResend,
379 ack_timeout = AckTimeout} = Config,
380 6 ?LOG_STATE(debug, "Answer for register topic name ~p is timeout
381 and retry register, packet id: ~p, retry: ~p/~p",
382 6 [TopicName, PacketId, ResendTimes, MaxResend], State),
383
384 6 if
385 WhetherResend andalso ResendTimes < MaxResend ->
386 5 emqttsn_send:send_register(Config, Socket, TopicName, PacketId),
387 5 {keep_state, State#state{next_packet_id = PacketId,
388 waiting_data = {reg, TopicName}},
389 {state_timeout, AckTimeout, {ResendTimes + 1}}};
390 not WhetherResend orelse ResendTimes >= MaxResend ->
391 1 {next_state, connected, State}
392 end;
393
394 %%------------------------------------------------------------------------------
395 %% @doc Finish subscribe request and back to connected
396 %%
397 %% state : [wait_sub] -> [connected]
398 %% trigger: receive suback packet
399 %%
400 %% gen_statem for state machine
401 %% @end
402 %%------------------------------------------------------------------------------
403
404 handle_event(cast,
405 ?SUBACK_PACKET(GrantQos, RemoteTopicId, RemotePacketId, ReturnCode),
406 wait_sub,
407 State = #state{next_packet_id = LocalPacketId, topic_id_name = IdMap,
408 topic_name_id = NameMap, topic_id_use_qos = QosMap,
409 config = Config,
410 waiting_data = {sub, TopicIdType, TopicIdOrName, _MaxQos}})
411 when RemotePacketId == LocalPacketId ->
412 8 ?LOG_STATE(debug, "Finish subscribe request and back to connected,
413 packet: ~p, topic: ~p, return_code: ~p, qos: ~p",
414 8 [RemotePacketId, TopicIdOrName, ReturnCode, GrantQos], State),
415 8 #config{recv_qos = LocalQos} = Config,
416 8 case ReturnCode of
417 ?RC_ACCEPTED ->
418 7 Qos = min(GrantQos, LocalQos),
419 7 NewQosMap = dict:store(RemoteTopicId, Qos, QosMap),
420 7 case TopicIdType of
421 ?SHORT_TOPIC_NAME ->
422 5 NewIdMap = dict:store(RemoteTopicId, TopicIdOrName, IdMap),
423 5 NewNameMap = dict:store(TopicIdOrName, RemoteTopicId, NameMap),
424 5 {next_state, connected,
425 State#state{next_packet_id = next_packet_id(LocalPacketId),
426 waiting_data = {}, topic_id_name = NewIdMap,
427 topic_name_id = NewNameMap, topic_id_use_qos = NewQosMap}};
428 _ ->
429 2 ?assertEqual(RemoteTopicId, TopicIdOrName),
430 2 {next_state, connected,
431 State#state{next_packet_id = next_packet_id(LocalPacketId),
432 waiting_data = {}, topic_id_use_qos = NewQosMap}}
433 end;
434 _ ->
435 1 ?LOG_STATE(error, "failed for subscribe response, return_code: ~p",
436
:-(
[ReturnCode], State),
437 1 {next_state, connected,
438 State#state{next_packet_id = next_packet_id(LocalPacketId),
439 waiting_data = {}}}
440 end;
441
442 %%------------------------------------------------------------------------------
443 %% @doc Answer for subscribe request is timeout and retry subscribe
444 %%
445 %% state : keep [wait_sub]
446 %% trigger: state timeout + can resend
447 %%
448 %% state : [wait_sub] -> [connected]
449 %% trigger: state timeout + cannot resend
450 %%
451 %% gen_statem for state machine
452 %% @end
453 %%------------------------------------------------------------------------------
454
455 handle_event(state_timeout, {ResendTimes}, wait_sub,
456 State =
457 #state{next_packet_id = PacketId, config = Config, socket = Socket,
458 waiting_data = {sub, TopicIdType, TopicIdOrName, MaxQos}}) ->
459 6 #config{max_resend = MaxResend, resend_no_qos = WhetherResend,
460 ack_timeout = AckTimeout} = Config,
461 6 ?LOG_STATE(warning, "Answer for subscribe request is timeout and retry subscribe, retry: ~p/~p",
462
:-(
[ResendTimes, MaxResend], State),
463 6 if
464 WhetherResend andalso ResendTimes < MaxResend ->
465 5 emqttsn_send:send_subscribe(Config, Socket, true, TopicIdType,
466 TopicIdOrName, MaxQos, PacketId),
467 5 {keep_state,
468 State#state{next_packet_id = PacketId,
469 waiting_data = {sub, TopicIdType, TopicIdOrName, MaxQos}},
470 {state_timeout, AckTimeout, {ResendTimes + 1}}};
471 not WhetherResend orelse ResendTimes >= MaxResend ->
472 1 {next_state, connected, State}
473 end;
474
475 %%------------------------------------------------------------------------------
476 %% @doc Finish unsubscribe request and back to connected
477 %%
478 %% state : [wait_unsub] -> [connected]
479 %% trigger: receive unsuback packet
480 %%
481 %% gen_statem for state machine
482 %% @end
483 %%------------------------------------------------------------------------------
484
485 handle_event(cast,
486 ?UNSUBACK_PACKET(RemotePacketId),
487 wait_unsub,
488 State = #state{next_packet_id = LocalPacketId,
489 waiting_data = {unsub, TopicIdType, TopicIdOrName}})
490 when RemotePacketId == LocalPacketId ->
491 1 ?LOG_STATE(debug, "Finish unsubscribe request and back to connected,
492 1 packet: ~p, topic type: ~p, topic: ~p", [RemotePacketId, TopicIdType, TopicIdOrName], State),
493 1 {next_state, connected, State#state{next_packet_id = next_packet_id(LocalPacketId),
494 waiting_data = {}}};
495
496 %%------------------------------------------------------------------------------
497 %% @doc Answer for unsubscribe request is timeout and retry unsubscribe
498 %%
499 %% state : keep [wait_unsub]
500 %% trigger: state timeout + can resend
501 %%
502 %% state : [wait_unsub] -> [connected]
503 %% trigger: state timeout + cannot resend
504 %%
505 %% gen_statem for state machine
506 %% @end
507 %%------------------------------------------------------------------------------
508
509 handle_event(state_timeout, {ResendTimes}, wait_unsub, State =
510 #state{next_packet_id = PacketId, config = Config, socket = Socket,
511 waiting_data = {unsub, TopicIdType, TopicIdOrName}}) ->
512 6 #config{max_resend = MaxResend, resend_no_qos = WhetherResend,
513 ack_timeout = AckTimeout} = Config,
514 6 ?LOG_STATE(warning, "Answer for unsubscribe request is timeout and retry subscribe, retry: ~p/~p",
515
:-(
[ResendTimes, MaxResend], State),
516 6 if
517 WhetherResend andalso ResendTimes < MaxResend ->
518 5 emqttsn_send:send_unsubscribe(Config, Socket, TopicIdType, TopicIdOrName, PacketId),
519 5 {keep_state,
520 State#state{next_packet_id = PacketId,
521 waiting_data = {unsub, TopicIdType, TopicIdOrName}},
522 {state_timeout, AckTimeout, {ResendTimes + 1}}};
523 not WhetherResend orelse ResendTimes >= MaxResend ->
524 1 {next_state, connected, State}
525 end;
526 %%------------------------------------------------------------------------------
527 %% @doc Finish publish request and back to connected when at QoS 1
528 %%
529 %% state : [wait_pub_qos1] -> [connected]
530 %% trigger: receive puback packet
531 %%
532 %% gen_statem for state machine
533 %% QoS 1
534 %% @end
535 %%------------------------------------------------------------------------------
536
537 handle_event(cast, ?PUBACK_PACKET(RemoteTopicId, RemotePacketId, ReturnCode),
538 wait_pub_qos1,
539 State = #state{next_packet_id = LocalPacketId,
540 config = Config, topic_id_name = Map,
541 waiting_data = {pub, ?QOS_1, TopicIdType,
542 LocalTopicIdOrName, Message}})
543 when RemotePacketId == LocalPacketId ->
544 2 ?LOG_STATE(debug, "Finish publish request and back to connected
545 when at QoS 1, packet: ~p, return_code: ~p",
546 2 [RemotePacketId, ReturnCode], State),
547 2 #config{max_message_each_topic = TopicMaxMsg} = Config,
548 2 NewMap = case TopicIdType of
549 ?PRE_DEF_TOPIC_ID ->
550
:-(
?assertEqual(RemoteTopicId, LocalTopicIdOrName),
551
:-(
Map;
552 ?TOPIC_ID ->
553
:-(
?assertEqual(RemoteTopicId, LocalTopicIdOrName),
554
:-(
Map;
555 ?SHORT_TOPIC_NAME ->
556 2 dict:store(RemoteTopicId, LocalTopicIdOrName, Map)
557 end,
558 2 if
559 ReturnCode =/= ?RC_ACCEPTED
560 ->
561 1 ?LOG_STATE(error, "Failed for publish response, return code: ~p",
562
:-(
[ReturnCode], State),
563 1 {next_state, connected,
564 State#state{next_packet_id = next_packet_id(LocalPacketId),
565 topic_id_name = NewMap}};
566 ReturnCode =:= ?RC_ACCEPTED
567 ->
568 1 NewState = emqttsn_utils:store_msg(State, RemoteTopicId, TopicMaxMsg, Message),
569 1 {next_state, connected,
570 NewState#state{next_packet_id = next_packet_id(LocalPacketId),
571 topic_id_name = NewMap}}
572 end;
573
574
575 %%------------------------------------------------------------------------------
576 %% @doc Answer for publish request is timeout and retry publish at QoS 1
577 %%
578 %% state : keep [wait_pub_qos1]
579 %% trigger: state timeout + can resend
580 %%
581 %% state : [wait_pub_qos1] -> [connected]
582 %% trigger: state timeout + cannot resend
583 %%
584 %% gen_statem for state machine
585 %% QoS 1
586 %% @end
587 %%------------------------------------------------------------------------------
588
589 handle_event(state_timeout, {Retain, ResendTimes}, wait_pub_qos1,
590 State = #state{next_packet_id = PacketId, config = Config,
591 socket = Socket,
592 waiting_data = {pub, ?QOS_1, TopicIdType,
593 TopicIdOrName, Message}}) ->
594 6 #config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config,
595 6 ?LOG_STATE(warning, "Answer for publish request is timeout
596 and retry publish at QoS 1, retain: ~p, retry: ~p/~p",
597
:-(
[Retain, ResendTimes, MaxResend], State),
598
599 6 if
600 ResendTimes < MaxResend ->
601 5 emqttsn_send:send_publish(Config, Socket, ?QOS_1, ?DUP_TRUE, Retain, TopicIdType,
602 TopicIdOrName, Message, PacketId),
603 5 {keep_state, State#state{next_packet_id = PacketId,
604 waiting_data = {pub, ?QOS_1, TopicIdType,
605 TopicIdOrName, Message}},
606 {state_timeout, AckTimeout, {Retain, ResendTimes + 1}}};
607 ResendTimes >= MaxResend ->
608 1 {next_state, connected, State#state{waiting_data = {}}}
609 end;
610
611 %%------------------------------------------------------------------------------
612 %% @doc Continue publish request part 2 - receive pubrec and send pubrel
613 %% and then transfer to wait pubrel packet when at QoS 2
614 %%
615 %% state : [wait_pub_qos1] -> [wait_pubrel_qos2]
616 %% trigger: receive puback packet
617 %%
618 %% gen_statem for state machine
619 %% QoS 2
620 %% @end
621 %%------------------------------------------------------------------------------
622
623 handle_event(cast, ?PUBREC_PACKET(RemotePacketId), wait_pub_qos2,
624 State = #state{next_packet_id = LocalPacketId, config = Config,
625 topic_name_id = NameMap, socket = Socket,
626 waiting_data = {pub, ?QOS_2, TopicIdType,
627 TopicIdOrName, Message}})
628 when RemotePacketId == LocalPacketId ->
629 1 ?LOG_STATE(debug, "Continue publish request part 2, packet id: ~p",
630 1 [RemotePacketId], State),
631 1 TopicId = get_topic_id(TopicIdType, TopicIdOrName, NameMap),
632 1 #config{ack_timeout = AckTimeout,
633 max_message_each_topic = TopicMaxMsg} = Config,
634 1 NewState = emqttsn_utils:store_msg(State, TopicId, TopicMaxMsg, Message),
635 1 emqttsn_send:send_pubrel(Config, Socket, RemotePacketId),
636 1 {next_state, wait_pubrel_qos2,
637 NewState#state{next_packet_id = next_packet_id(RemotePacketId),
638 waiting_data = {pubrel, ?QOS_2}},
639 {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}};
640
641 %%------------------------------------------------------------------------------
642 %% @doc Answer for publish request is timeout at part 2 - receive pubrec
643 %% and then retry publish at QoS 2
644 %%
645 %% state : keep [wait_pub_qos2]
646 %% trigger: state timeout + can resend
647 %%
648 %% state : [wait_pub_qos1] -> [connected]
649 %% trigger: state timeout + cannot resend
650 %%
651 %% gen_statem for state machine
652 %% QoS 2
653 %% @end
654 %%------------------------------------------------------------------------------
655
656 handle_event(state_timeout, {Retain, ResendTimes}, wait_pub_qos2,
657 State = #state{next_packet_id = PacketId,
658 config = Config, socket = Socket,
659 waiting_data = {pub, ?QOS_2, TopicIdType,
660 TopicIdOrName, Message}}) ->
661
:-(
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config,
662
:-(
?LOG_STATE(warning, "Answer for publish request is timeout
663 at part 2, retain: ~p, retry: ~p/~p",
664
:-(
[Retain, ResendTimes, MaxResend], State),
665
:-(
if
666 ResendTimes < MaxResend ->
667
:-(
emqttsn_send:send_publish(Config, Socket, ?QOS_2, ?DUP_TRUE, Retain,
668 TopicIdType, TopicIdOrName, Message, PacketId),
669
:-(
{keep_state,
670 State#state{next_packet_id = PacketId,
671 waiting_data = {pub, ?QOS_2, TopicIdType,
672 TopicIdOrName, Message}},
673 {state_timeout, AckTimeout, {Retain, ResendTimes + 1}}};
674 ResendTimes >= MaxResend ->
675
:-(
{next_state, connected, State#state{waiting_data = {}}}
676 end;
677
678 %%------------------------------------------------------------------------------
679 %% @doc Finish publish request part 3 - receive pubcomp
680 %% and then back to connected when at QoS 2
681 %%
682 %% state : [wait_pub_qos1] -> [wait_pubrel_qos2]
683 %% trigger: receive puback packet
684 %%
685 %% gen_statem for state machine
686 %% QoS 2
687 %% @end
688 %%------------------------------------------------------------------------------
689
690 handle_event(cast, ?PUBCOMP_PACKET(RemotePacketId), wait_pubrel_qos2,
691 State = #state{waiting_data = {pubrel, ?QOS_2}}) ->
692 1 ?LOG_STATE(debug, "Finish publish request part 3, packet id: ~p",
693 1 [RemotePacketId], State),
694 1 {next_state, connected,
695 State#state{next_packet_id = next_packet_id(RemotePacketId),
696 waiting_data = {}}};
697
698 %%------------------------------------------------------------------------------
699 %% @doc Answer for publish request is timeout at part 3 - receive pubcomp
700 %% and then retry pubrel at QoS 2
701 %%
702 %% state : keep [wait_pubrel_qos2]
703 %% trigger: state timeout + can resend
704 %%
705 %% state : [wait_pubrel_qos2] -> [connected]
706 %% trigger: state timeout + cannot resend
707 %%
708 %% gen_statem for state machine
709 %% QoS 2
710 %% @end
711 %%------------------------------------------------------------------------------
712
713 handle_event(state_timeout, {ResendTimes}, wait_pubrel_qos2,
714 State = #state{next_packet_id = PacketId,
715 config = Config, socket = Socket,
716 waiting_data = {pubrel, ?QOS_2}}) ->
717
:-(
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config,
718
:-(
?LOG_STATE(warning, "Answer for publish request is timeout at part 3, retry: ~p/~p",
719
:-(
[ResendTimes, MaxResend], State),
720
721
:-(
if
722 ResendTimes < MaxResend ->
723
:-(
emqttsn_send:send_pubrec(Config, Socket, PacketId),
724
:-(
{keep_state,
725 State#state{next_packet_id = PacketId,
726 waiting_data = {pubrel, ?QOS_2}},
727 {state_timeout, AckTimeout, {ResendTimes + 1}}};
728 ResendTimes >= MaxResend ->
729
:-(
{next_state, connected, State#state{waiting_data = {}}}
730 end;
731
732 %%------------------------------------------------------------------------------
733 %% @doc Finish receive publish part 2 - receive pubrel and send pubcomp
734 %% and then back to connected when at QoS 2
735 %%
736 %% state : [wait_pubrec_qos2] -> [connected]
737 %% trigger: receive pubrel packet
738 %%
739 %% gen_statem for state machine
740 %% QoS 2
741 %% @end
742 %%------------------------------------------------------------------------------
743
744 handle_event(cast, ?PUBREL_PACKET(RemotePacketId), wait_pubrec_qos2,
745 State = #state{socket = Socket, config = Config,
746 next_packet_id = LocalPacketId})
747 when RemotePacketId == LocalPacketId ->
748
:-(
?LOG_STATE(debug, "Finish receive publish part 2, packet id: ~p",
749
:-(
[RemotePacketId], State),
750
:-(
emqttsn_send:send_pubcomp(Config, Socket, RemotePacketId),
751
:-(
{next_state, connected,
752 State#state{next_packet_id = next_packet_id(RemotePacketId)}};
753
754 %%------------------------------------------------------------------------------
755 %% @doc Answer for receive publish is timeout at part 2 - receive pubrel
756 %% and then retry pubrec at QoS 2
757 %%
758 %% state : keep [wait_pubrec_qos2]
759 %% trigger: state timeout + can resend
760 %%
761 %% state : [wait_pubrec_qos2] -> [connected]
762 %% trigger: state timeout + cannot resend
763 %%
764 %% gen_statem for state machine
765 %% QoS 2
766 %% @end
767 %%------------------------------------------------------------------------------
768
769 handle_event(state_timeout, {ResendTimes}, wait_pubrec_qos2,
770 State = #state{next_packet_id = PacketId,
771 config = Config, socket = Socket,
772 waiting_data = {FromStateName}}) ->
773
:-(
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config,
774
:-(
?LOG_STATE(warning, "Answer for receive publish is timeout at part 2, retry: ~p/~p",
775
:-(
[ResendTimes, MaxResend], State),
776
777
:-(
if
778 ResendTimes < MaxResend ->
779
:-(
emqttsn_send:send_pubrec(Config, Socket, PacketId),
780
:-(
{keep_state, State#state{next_packet_id = PacketId},
781 {state_timeout, AckTimeout, {ResendTimes + 1}}};
782 ResendTimes >= MaxResend ->
783
:-(
{next_state, FromStateName, State#state{waiting_data = {}}}
784 end;
785
786 %%------------------------------------------------------------------------------
787 %% @doc Finish ping request and back to connected
788 %%
789 %% state : [wait_pingreq] -> [connected]
790 %% trigger: receive pingresp packet
791 %%
792 %% gen_statem for state machine
793 %% @end
794 %%------------------------------------------------------------------------------
795
796 handle_event(cast, ?PINGRESP_PACKET(), wait_pingreq, State) ->
797
:-(
?LOG_STATE(debug, "Finish ping request and back to connected", [], State),
798
:-(
{next_state, connected, State};
799
800 %%------------------------------------------------------------------------------
801 %% @doc Answer for ping request is timeout and retry pingreq
802 %%
803 %% state : keep [wait_pingreq]
804 %% trigger: state timeout + can resend
805 %%
806 %% state : [wait_pingreq] -> [connected]
807 %% trigger: state timeout + cannot resend
808 %%
809 %% gen_statem for state machine
810 %% @end
811 %%------------------------------------------------------------------------------
812
813 handle_event(state_timeout, {ResendTimes}, wait_pingreq,
814 State = #state{config = Config, socket = Socket}) ->
815
:-(
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config,
816
:-(
?LOG_STATE(warning, "Answer for ping request is timeout and retry pingreq, retry: ~p/~p",
817
:-(
[ResendTimes, MaxResend], State),
818
:-(
if
819 ResendTimes < MaxResend ->
820
:-(
emqttsn_send:send_pingreq(Config, Socket),
821
:-(
{keep_state, State, {state_timeout, AckTimeout, {ResendTimes + 1}}};
822 ResendTimes >= MaxResend ->
823
:-(
{next_state, connect_other, State#state{waiting_data = {}}}
824 end;
825
826 %%------------------------------------------------------------------------------
827 %% @doc Finish asleep request and go to asleep
828 %%
829 %% state : [wait_asleep] -> [asleep]
830 %% trigger: receive pingresp packet
831 %%
832 %% gen_statem for state machine
833 %% @end
834 %%------------------------------------------------------------------------------
835
836 handle_event(cast, ?DISCONNECT_PACKET(), wait_asleep, State = #state{waiting_data = {sleep, Interval}}) ->
837 1 ?LOG_STATE(debug, "Finish asleep request and go to asleep", [], State),
838 1 {next_state, asleep, State, {state_timeout, Interval, ready_awake}};
839
840 %%------------------------------------------------------------------------------
841 %% @doc Answer for ping request is timeout and retry pingreq
842 %%
843 %% state : keep [wait_pingreq]
844 %% trigger: state timeout + can resend
845 %%
846 %% state : [wait_pingreq] -> [connected]
847 %% trigger: state timeout + cannot resend
848 %%
849 %% gen_statem for state machine
850 %% @end
851 %%------------------------------------------------------------------------------
852
853 handle_event(state_timeout, {ResendTimes}, wait_asleep,
854 State = #state{config = Config, socket = Socket, waiting_data = {sleep, Interval}}) ->
855
:-(
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config,
856
:-(
?LOG_STATE(warning, "Answer for asleep request is timeout and retry asleep, retry: ~p/~p",
857
:-(
[ResendTimes, MaxResend], State),
858
:-(
if
859 ResendTimes < MaxResend ->
860
:-(
emqttsn_send:send_asleep(Config, Socket, Interval),
861
:-(
{keep_state, State, {state_timeout, AckTimeout, {ResendTimes + 1}}};
862 ResendTimes >= MaxResend ->
863
:-(
{next_state, connect_other, State#state{waiting_data = {}}}
864 end;
865
866 %-------------------------------------------------------------------------------
867 % Client is connected and ready for subscribe/publish
868 %-------------------------------------------------------------------------------
869
870 %%------------------------------------------------------------------------------
871 %% @doc Request Gateway to register and then wait for regack
872 %%
873 %% state : [connected] -> [wait_reg]
874 %% trigger: manual call
875
876 %% gen_statem for state machine
877 %% @end
878 %%------------------------------------------------------------------------------
879
880 handle_event(cast, {reg, TopicName}, connected,
881 State = #state{next_packet_id = PacketId,
882 socket = Socket, config = Config}) ->
883 7 ?LOG_STATE(debug, "Request Gateway to register and then wait for regack, topic name: ~p",
884 7 [TopicName], State),
885 7 #config{ack_timeout = AckTimeout} = Config,
886 7 emqttsn_send:send_register(Config, Socket, TopicName, PacketId),
887 7 {next_state, wait_reg,
888 State#state{waiting_data = {reg, TopicName}},
889 {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}};
890
891 %%------------------------------------------------------------------------------
892 %% @doc Request Gateway to subscribe and then wait for suback
893 %%
894 %% state : [connected] -> [wait_sub]
895 %% trigger: manual call
896
897 %% gen_statem for state machine
898 %% @end
899 %%------------------------------------------------------------------------------
900
901 handle_event(cast, {sub, TopicIdType, TopicIdOrName, MaxQos}, connected,
902 State = #state{next_packet_id = PacketId,
903 socket = Socket, config = Config}) ->
904 9 ?LOG_STATE(debug, "Request Gateway to subscribe and then
905 wait for suback, type: ~p, topic: ~p",
906 9 [TopicIdType, TopicIdOrName], State),
907 9 #config{ack_timeout = AckTimeout} = Config,
908 9 emqttsn_send:send_subscribe(Config, Socket, false, TopicIdType, TopicIdOrName, MaxQos, PacketId),
909 9 {next_state, wait_sub, State#state{waiting_data = {sub, TopicIdType, TopicIdOrName, MaxQos}},
910 {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}};
911
912 %%------------------------------------------------------------------------------
913 %% @doc Request Gateway to unsubscribe and then wait for unsuback
914 %%
915 %% state : [connected] -> [wait_unsub]
916 %% trigger: manual call
917
918 %% gen_statem for state machine
919 %% @end
920 %%------------------------------------------------------------------------------
921
922 handle_event(cast, {unsub, TopicIdType, TopicIdOrName}, connected,
923 State = #state{next_packet_id = PacketId,
924 socket = Socket, config = Config}) ->
925 2 ?LOG_STATE(debug, "Request Gateway to subscribe and then
926 wait for suback, type: ~p, topic: ~p",
927 2 [TopicIdType, TopicIdOrName], State),
928 2 #config{ack_timeout = AckTimeout} = Config,
929 2 emqttsn_send:send_unsubscribe(Config, Socket, TopicIdType, TopicIdOrName, PacketId),
930 2 {next_state, wait_unsub, State#state{waiting_data = {unsub, TopicIdType, TopicIdOrName}},
931 {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}};
932 %%------------------------------------------------------------------------------
933 %% @doc Request Gateway to publish and then wait for
934 %% nothing at QoS 0
935 %% puback at QoS 1
936 %% pubrec at QoS 2
937 %%
938 %% state :
939 %% QoS 0: keep[connected]
940 %% QoS 1: [connected] -> [wait_pub_qos1]
941 %% QoS 2: [connected] -> [wait_pub_qos2]
942 %% trigger: manual call
943
944 %% gen_statem for state machine
945 %% @end
946 %%------------------------------------------------------------------------------
947
948 handle_event(cast, {pub, Retain, TopicIdType, TopicIdOrName, Message}, connected,
949 State = #state{next_packet_id = PacketId, socket = Socket,
950 topic_name_id = NameMap, config = Config}) ->
951 8 ?LOG_STATE(debug, "Request Gateway to publish, type: ~p, topic: ~p, return code: ~p",
952 8 [TopicIdType, TopicIdOrName, Retain], State),
953 8 #config{ack_timeout = AckTimeout,
954 max_message_each_topic = TopicMaxMsg, pub_qos = Qos} = Config,
955 8 TopicId = get_topic_id(TopicIdType, TopicIdOrName, NameMap),
956 8 emqttsn_send:send_publish(Config, Socket, Qos, ?DUP_FALSE, Retain,
957 TopicIdType, TopicIdOrName, Message, PacketId),
958
959 8 case Qos of
960 ?QOS_0 ->
961 4 NewState = emqttsn_utils:store_msg(State, TopicId, TopicMaxMsg, Message),
962 4 {keep_state, NewState};
963 3 ?QOS_1 -> {next_state, wait_pub_qos1,
964 State#state{waiting_data = {pub, ?QOS_1, TopicIdType,
965 TopicIdOrName, Message}},
966 {state_timeout, AckTimeout, {Retain, ?RESEND_TIME_BEG}}};
967 1 ?QOS_2 -> {next_state, wait_pub_qos2,
968 State#state{waiting_data = {pub, ?QOS_2, TopicIdType,
969 TopicIdOrName, Message}},
970 {state_timeout, AckTimeout, {Retain, ?RESEND_TIME_BEG}}}
971 end;
972
973 %%------------------------------------------------------------------------------
974 %% @doc Receive publish request from other clients and then wait for
975 %% nothing at QoS 0/1
976 %% pubrel at QoS 2
977 %%
978 %% state :
979 %% QoS 0/1: keep [connected]
980 %% QoS 2 : [connected] -> [wait_pubrec_qos2]
981 %% trigger: receive publish packet
982 %%
983 %% gen_statem for state machine
984 %% @end
985 %%------------------------------------------------------------------------------
986
987 handle_event(cast, Packet = ?PUBLISH_PACKET(_RemoteDup, _RemoteQos,
988 _RemoteRetain, _TopicIdType,
989 _TopicId, _PacketId, _Message),
990 connected, State) ->
991 6 ?LOG_STATE(debug, "Receive publish request from other clients", [], State),
992 6 recv_publish(Packet, State, connected);
993
994 %%------------------------------------------------------------------------------
995 %% @doc Receive ping request from gateway
996 %%
997 %% state : keep [connected]
998 %% trigger: receive pingreq packet
999 %%
1000 %% gen_statem for state machine
1001 %% @end
1002 %%------------------------------------------------------------------------------
1003
1004 handle_event(cast, ?PINGREQ_PACKET(), connected,
1005 State = #state{config = Config, socket = Socket}) ->
1006
:-(
?LOG_STATE(debug, "Receive ping request from gateway", [], State),
1007
:-(
emqttsn_send:send_pingresp(Config, Socket),
1008
:-(
{keep_state, State};
1009
1010 %%------------------------------------------------------------------------------
1011 %% @doc Send ping request to gateway
1012 %%
1013 %% state : [connected] -> [wait_pingreq]
1014 %% trigger: state timeout
1015 %%
1016 %% gen_statem for state machine
1017 %% @end
1018 %%------------------------------------------------------------------------------
1019
1020 handle_event(state_timeout, ping, connected,
1021 State = #state{config = Config, socket = Socket}) ->
1022
:-(
?LOG_STATE(debug, "Send ping request to gateway", [], State),
1023
:-(
#config{ack_timeout = AckTimeout} = Config,
1024
:-(
emqttsn_send:send_pingreq(Config, Socket),
1025
:-(
{next_state, wait_pingreq, State, {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}};
1026
1027 %%------------------------------------------------------------------------------
1028 %% @doc Notify Gateway to sleep for a duration
1029 %%
1030 %% state : keep [connected]
1031 %% trigger: manual call + zero sleep interval
1032 %%
1033 %% state : [connected] -> [asleep]
1034 %% trigger: manual call + valid sleep interval
1035 %%
1036 %% gen_statem for state machine
1037 %% @end
1038 %%------------------------------------------------------------------------------
1039
1040 handle_event(cast, {sleep, Interval}, connected,
1041 State = #state{config = Config, socket = Socket}) ->
1042 1 ?LOG_STATE(debug, "Notify Gateway to sleep for a duration", [], State),
1043 1 if
1044 Interval =< 0 ->
1045
:-(
?LOG_STATE(debug, "non-positive Sleep interval ~p leads to no sleeping mode",
1046
:-(
[Interval], State),
1047
:-(
{keep_state, State};
1048 Interval > 0 ->
1049 1 #config{ack_timeout = AckTimeout} = Config,
1050 1 emqttsn_send:send_asleep(Config, Socket, Interval),
1051 1 {next_state, wait_asleep, State#state{waiting_data = {sleep, Interval}},
1052 {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}
1053 end;
1054
1055 %-------------------------------------------------------------------------------
1056 % Reconnect other gateways after failed
1057 %-------------------------------------------------------------------------------
1058
1059 %%------------------------------------------------------------------------------
1060 %% @doc Connect to other available gateway
1061 %%
1062 %% state : [connect_other] -> [found]
1063 %% trigger: enter state + have available gateway
1064 %%
1065 %% state : [connect_other] -> [initialized]
1066 %% trigger: enter state + have no available gateway
1067 %%
1068 %% state : [connect_other] -> [initialized]
1069 %% trigger: enter state + traverse known gateways exceed max times
1070 %%
1071 %% gen_statem for state machine
1072 %% @end
1073 %%------------------------------------------------------------------------------
1074
1075 handle_event(enter, _OldState, connect_other,
1076 State = #state{active_gw = #gw_collect{id = FormerId},
1077 config = Config, socket = Socket,
1078 gw_failed_cycle = TryTimes, name = Name}) ->
1079
:-(
?LOG_STATE(debug, "Connect to other available gateway", [], State),
1080
:-(
#config{reconnect_max_times = MaxTry} = Config,
1081
:-(
Desperate = TryTimes > MaxTry,
1082
:-(
AvailableGW = next_gw(Name, FormerId),
1083
:-(
FirstGW = first_gw(Name),
1084
:-(
if FirstGW =:= AvailableGW
1085
:-(
-> NewState = State#state{gw_failed_cycle = TryTimes + 1}
1086 end,
1087
:-(
case AvailableGW of
1088 #gw_info{id = GWId, host = Host, port = Port} when Desperate =:= false ->
1089
:-(
{next_state, found,
1090 NewState#state{active_gw = #gw_collect{id = GWId, host = Host,
1091 port = Port}}};
1092 _ ->
1093
:-(
{next_state, initialized, #state{name = Name, socket = Socket, config = Config}}
1094 end;
1095
1096 %-------------------------------------------------------------------------------
1097 % Sleeping feature
1098 %-------------------------------------------------------------------------------
1099
1100 %%------------------------------------------------------------------------------
1101 %% @doc Send ping request to gateway to awake
1102 %%
1103 %% state : [asleep] -> [awake]
1104 %% trigger: state timeout
1105 %%
1106 %% gen_statem for state machine
1107 %% @end
1108 %%------------------------------------------------------------------------------
1109
1110 handle_event(state_timeout, ready_awake, asleep, State = #state{config = Config, socket = Socket}) ->
1111 1 ?LOG_STATE(debug, "Send ping request to gateway to awake", [], State),
1112 1 #config{ack_timeout = AckTimeout, client_id = ClintId} = Config,
1113 1 emqttsn_send:send_awake(Config, Socket, ClintId),
1114 1 {next_state, awake, State, {state_timeout, AckTimeout,
1115 {recv_awake, ?RESEND_TIME_BEG}}};
1116
1117 %%------------------------------------------------------------------------------
1118 %% @doc Receive publish request from other clients and then wait for
1119 %% nothing at QoS 0/1
1120 %% pubrel at QoS 2
1121 %%
1122 %% state :
1123 %% QoS 0/1: keep [awake]
1124 %% QoS 2 : [awake] -> [wait_pubrec_qos2]
1125 %% trigger: receive publish packet
1126 %%
1127 %% gen_statem for state machine
1128 %% @end
1129 %%------------------------------------------------------------------------------
1130
1131 handle_event(cast, Packet = ?PUBLISH_PACKET(_RemoteDup, _RemoteQos, _RemoteRetain,
1132 _TopicIdType, _TopicId, _PacketId,
1133 _Message), awake, State) ->
1134
:-(
?LOG_STATE(debug, "Receive publish request from other clients", [], State),
1135
:-(
recv_publish(Packet, State, awake);
1136
1137 %%------------------------------------------------------------------------------
1138 %% @doc Receive pingresp request from gateway and goto asleep
1139 %%
1140 %% state : [awake] -> [asleep]
1141 %% trigger: receive pingresp packet
1142 %%
1143 %% gen_statem for state machine
1144 %% @end
1145 %%------------------------------------------------------------------------------
1146
1147 handle_event(cast, ?PINGRESP_PACKET(), awake, State = #state{config = Config}) ->
1148 1 ?LOG_STATE(debug, "Receive pingresp request from gateway and goto asleep",
1149 1 [], State),
1150 1 #config{keep_alive = PingInterval} = Config,
1151 1 {next_state, asleep, State, {state_timeout, PingInterval, ping}};
1152
1153 %%------------------------------------------------------------------------------
1154 %% @doc Answer for awake request is timeout and retry awake
1155 %%
1156 %% state : keep [awake]
1157 %% trigger: state timeout + can resend
1158 %%
1159 %% state : [awake] -> [asleep]
1160 %% trigger: state timeout + cannot resend
1161 %%
1162 %% gen_statem for state machine
1163 %% @end
1164 %%------------------------------------------------------------------------------
1165
1166 handle_event(state_timeout, {recv_awake, ResendTimes}, awake,
1167 State = #state{config = Config, socket = Socket}) ->
1168
:-(
#config{max_resend = MaxResend, client_id = ClientId} = Config,
1169
:-(
?LOG_STATE(warning, "Answer for awake request is timeout and retry awake, retry: ~p/~p",
1170
:-(
[ResendTimes, MaxResend], State),
1171
1172
:-(
if
1173 ResendTimes < MaxResend ->
1174
:-(
emqttsn_send:send_awake(Config, Socket, ClientId),
1175
:-(
{keep_state, State, {state_timeout, update, {ResendTimes + 1}}};
1176 ResendTimes >= MaxResend ->
1177
:-(
{next_state, asleep, State}
1178 end;
1179
1180 %%------------------------------------------------------------------------------
1181 %% @doc Answer for gateway address request from other clients
1182 %%
1183 %% state : keep Any
1184 %% trigger: receive searchgw packet
1185 %%
1186 %% gen_statem for state machine
1187 %% @end
1188 %%------------------------------------------------------------------------------
1189
1190 handle_event(cast, {?SEARCHGW_PACKET(Radius), Host, Port}, _StateName,
1191 State = #state{name = Name, socket = Socket, config = Config}) ->
1192
:-(
?LOG_STATE(debug, "Answer for gateway address ~p:~p request from other clients",
1193
:-(
[Host, Port], State),
1194
:-(
FirstGW = first_gw(Name),
1195
:-(
case FirstGW of
1196 #gw_info{id = GateWayId, host = GWHost} ->
1197
:-(
emqttsn_send:send_gwinfo(Config, Socket, Host, Port, Radius, GateWayId, GWHost);
1198
:-(
none -> none
1199 end,
1200
:-(
{keep_state, State};
1201
1202 %%------------------------------------------------------------------------------
1203 %% @doc Receive register request from other clients
1204 %%
1205 %% state : keep [connected]/[awake]
1206 %% trigger: receive register packet
1207 %%
1208 %% gen_statem for state machine
1209 %% @end
1210 %%------------------------------------------------------------------------------
1211
1212 handle_event(cast, ?REGISTER_PACKET(TopicId, PacketId, TopicName),
1213 StateName, State = #state{socket = Socket, topic_id_name = IdMap,
1214 topic_name_id = NameMap, config = Config})
1215 when StateName =:= connected orelse StateName =:= awake ->
1216
:-(
?LOG_STATE(debug, "Receive register request for ~p:~p from other clients, packet id: ~p",
1217
:-(
[TopicId, TopicName, PacketId], State),
1218
:-(
NewIdMap = dict:store(TopicId, TopicName, IdMap),
1219
:-(
NewNameMap = dict:store(TopicName, TopicId, NameMap),
1220
:-(
emqttsn_send:send_regack(Config, Socket, TopicId, ?RC_ACCEPTED, PacketId),
1221
:-(
{keep_state, State#state{topic_id_name = NewIdMap, topic_name_id = NewNameMap,
1222 next_packet_id = next_packet_id(PacketId)}};
1223
1224 %%------------------------------------------------------------------------------
1225 %% @doc Request gateway to disconnect
1226 %%
1227 %% state : [asleep]/[awake]/[connected] -> [initialized]
1228 %% trigger: manual call + at [asleep]/[awake]/[connected]
1229
1230 %% gen_statem for state machine
1231 %% @end
1232 %%------------------------------------------------------------------------------
1233
1234 handle_event(cast, disconnect, StateName,
1235 State = #state{name = Name,socket = Socket, config = Config})
1236 when StateName =:= asleep orelse StateName =:= awake orelse
1237 StateName =:= connected ->
1238 23 ?LOG_STATE(debug, "Request gateway to disconnect", [], State),
1239 23 emqttsn_send:send_disconnect(Config, Socket),
1240 23 {next_state, initialized, #state{name = Name, socket = Socket, config = Config}};
1241
1242 %%------------------------------------------------------------------------------
1243 %% @doc Request gateway to become active
1244 %%
1245 %% state : [asleep]/[awake] -> [found]
1246 %% trigger: manual call
1247
1248 %% gen_statem for state machine
1249 %% @end
1250 %%------------------------------------------------------------------------------
1251
1252 handle_event(cast, connect, StateName, State)
1253 when StateName =:= asleep orelse StateName =:= awake ->
1254
:-(
?LOG_STATE(debug, "Request gateway to become active", [], State),
1255
:-(
{next_state, found, State};
1256
1257 %-------------------------------------------------------------------------------
1258 % Consume message manager and counter
1259 %-------------------------------------------------------------------------------
1260
1261 %%------------------------------------------------------------------------------
1262 %% @doc Reset message manager and counter
1263 %%
1264 %% state : keep Any
1265 %% trigger: auto called by get_msg
1266
1267 %% gen_statem for state machine
1268 %% @end
1269 %%------------------------------------------------------------------------------
1270
1271 handle_event(cast, {reset_msg, MsgManager, MsgCounter}, _StateName, State) ->
1272 4 ?LOG_STATE(debug, "Reset message\nManager: ~p\nCounter:~p",
1273 4 [MsgManager, MsgCounter], State),
1274 4 {keep_state, State#state{msg_manager = MsgManager, msg_counter = MsgCounter}};
1275
1276 %-------------------------------------------------------------------------------
1277 % Change config of state machine
1278 %-------------------------------------------------------------------------------
1279
1280 %%------------------------------------------------------------------------------
1281 %% @doc Change config of client
1282 %%
1283 %% state : keep Any
1284 %% trigger: manual call
1285
1286 %% gen_statem for state machine
1287 %% @end
1288 %%------------------------------------------------------------------------------
1289
1290 handle_event(cast, {config, Config}, _StateName, State) ->
1291 1 ?LOG_STATE(debug, "Change config: ~p", [Config], State),
1292 1 {keep_state, State#state{config = Config}};
1293
1294 %-------------------------------------------------------------------------------
1295 % Get State from state machine
1296 %-------------------------------------------------------------------------------
1297
1298 %%------------------------------------------------------------------------------
1299 %% @doc Get State data of client
1300 %%
1301 %% state : keep Any
1302 %% trigger: manual call
1303
1304 %% gen_statem for state machine
1305 %% @end
1306 %%------------------------------------------------------------------------------
1307
1308 handle_event({call, From}, get_state, _StateName, State) ->
1309 69 ?LOG_STATE(debug, "Get state", [], State),
1310 69 gen_statem:reply(From, State),
1311 69 {keep_state, State};
1312
1313
1314 %%------------------------------------------------------------------------------
1315 %% @doc Get State name of client
1316 %%
1317 %% state : keep Any
1318 %% trigger: manual call
1319
1320 %% gen_statem for state machine
1321 %% @end
1322 %%------------------------------------------------------------------------------
1323
1324 handle_event({call, From}, get_state_name, StateName, State) ->
1325 8789 ?LOG_STATE(debug, "Get state name: ~p", [StateName], State),
1326 8789 gen_statem:reply(From, StateName),
1327 8789 {keep_state, State};
1328
1329 %-------------------------------------------------------------------------------
1330 % Handle disconnect at any state
1331 %-------------------------------------------------------------------------------
1332
1333 %%------------------------------------------------------------------------------
1334 %% @doc Gateway disconnect and try to reconnect
1335 %%
1336 %% state : Any -> [found]
1337 %% trigger: receive disconnect packet
1338
1339 %% gen_statem for state machine
1340 %% @end
1341 %%------------------------------------------------------------------------------
1342
1343 handle_event(cast, ?DISCONNECT_PACKET(), StateName, State)
1344 when StateName =/= wait_asleep ->
1345 11 ?LOG_STATE(warning, "Connection reset by server", [], State),
1346 11 {next_state, found, State};
1347
1348 %-------------------------------------------------------------------------------
1349 % Process incoming packet
1350 %-------------------------------------------------------------------------------
1351
1352 %%------------------------------------------------------------------------------
1353 %% @doc Low-level method for receive packet
1354 %%
1355 %% event : {recv, {Host, Port, Bin}} -> {Packet, Host, Port}
1356 %% trigger: receive ADVERTISE_PACKET/GWINFO_PACKET packet
1357 %%
1358 %% event : {recv, {Host, Port, Bin}} -> {Packet, Host, Port}/Packet
1359 %% trigger: receive other packet
1360 %%
1361 %% gen_statem for state machine
1362 %% @end
1363 %%------------------------------------------------------------------------------
1364
1365 handle_event(info, {udp, Socket, Host, Port, Bin}, _StateName, State = #state{socket = SelfSocket})
1366 when Socket =:= SelfSocket ->
1367 62 ?LOG_STATE(debug, "recv packet {~p} from host ~p:~p",
1368 62 [Bin, Host, Port], State),
1369 62 process_incoming({Host, Port, Bin}, State);
1370
1371 handle_event(info,{udp_error, _Socket, Reason}, _StateName, State) ->
1372 64 ?LOG_STATE(debug, "recv failed for reason: ~p",
1373 64 [Reason], State),
1374 64 {keep_state, State};
1375
1376 handle_event(enter, Args, StateName, State) ->
1377 74 ?LOG_STATE(debug, "useless enter with ~p at ~p",
1378 74 [Args, StateName], State),
1379 74 {keep_state, State};
1380
1381 handle_event(Operation, Args, StateName, State) ->
1382 1 ?LOG_STATE(warning, "unsupported operation ~p with ~p at ~p",
1383
:-(
[Operation, Args, StateName], State),
1384 1 {keep_state, State}.
1385 %%------------------------------------------------------------------------------
1386 %% @doc Judge whether to reserve the source of sender
1387 %% @end
1388 %%------------------------------------------------------------------------------
1389
1390 -spec filter_packet_elsewhere(mqttsn_packet(), host(), inet:port_number()) ->
1391 {mqttsn_packet(), host(), inet:port_number()} | mqttsn_packet().
1392 filter_packet_elsewhere(Packet, Host, Port) ->
1393 62 case Packet of
1394 ?ADVERTISE_PACKET(_GateWayId, _Duration) ->
1395 1 {Packet, Host, Port};
1396 ?GWINFO_PACKET(_GateWayId) ->
1397 2 {Packet, Host, Port};
1398 59 _ -> Packet
1399 end.
1400
1401 %%------------------------------------------------------------------------------
1402 %% @doc Parse incoming binary data into packet
1403 %% @end
1404 %%------------------------------------------------------------------------------
1405
1406 -spec process_incoming({host(), inet:port_number(), bitstring()}, state())
1407 -> gen_statem:event_handler_result(state()).
1408 process_incoming({Host, Port, Bin},
1409 State = #state{active_gw = #gw_collect{host = ServerHost,
1410 port = ServerPort}}) ->
1411 62 case emqttsn_frame:parse(Bin) of
1412 {ok, Packet} ->
1413 62 Ret = filter_packet_elsewhere(Packet, Host, Port),
1414 62 {keep_state, State, {next_event, cast, Ret}};
1415 _ ->
1416
:-(
?LOG(warning, "drop packet from other than gateway",
1417 #{server_host => ServerHost, server_port => ServerPort,
1418
:-(
actual_host => Host, actual_port => Port}),
1419
:-(
{keep_state, State}
1420 end.
1421
1422 %-------------------------------------------------------------------------------
1423 % Reused recv methods
1424 %-------------------------------------------------------------------------------
1425
1426 %%------------------------------------------------------------------------------
1427 %% @doc Shared processing method for publish packet
1428 %% @end
1429 %%------------------------------------------------------------------------------
1430
1431 -spec recv_publish(mqttsn_packet(), state(), connected | awake) -> gen_statem:event_handler_result(state()).
1432 recv_publish(?PUBLISH_PACKET(_RemoteDup, _RemoteQos, _RemoteRetain, TopicIdType, TopicIdOrName, PacketId, Message),
1433 State = #state{topic_id_use_qos = QosMap, config = Config, socket = Socket, topic_name_id = NameMap}, FromStateName) ->
1434
1435 6 #config{ack_timeout = AckTimeout, max_message_each_topic = TopicMaxMsg} = Config,
1436 6 TopicId = get_topic_id(TopicIdType, TopicIdOrName, NameMap),
1437 6 Qos = dict:fetch(TopicId, QosMap),
1438 6 NewState = emqttsn_utils:store_msg(State, TopicId, TopicMaxMsg, Message),
1439 6 case Qos of
1440 ?QOS_0 ->
1441 6 {keep_state, NewState#state{next_packet_id = next_packet_id(PacketId)}};
1442 ?QOS_1 ->
1443
:-(
emqttsn_send:send_puback(Config, Socket, TopicId, ?RC_ACCEPTED, PacketId),
1444
:-(
{keep_state, NewState#state{next_packet_id = PacketId}};
1445 ?QOS_2 ->
1446
:-(
emqttsn_send:send_pubrec(Config, Socket, PacketId),
1447
:-(
{next_state, wait_pubrec_qos2,
1448 NewState#state{next_packet_id = PacketId, waiting_data = {FromStateName}},
1449 {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}
1450 end.
1451
1452 %%--------------------------------------------------------------------
1453 %% packet_id generator for state machine
1454 %%--------------------------------------------------------------------
1455
1456 -spec next_packet_id(packet_id()) -> packet_id().
1457 next_packet_id(?MAX_PACKET_ID) ->
1458
:-(
1;
1459 next_packet_id(Id) ->
1460 25 Id + 1.
1461
1462 -spec get_topic_id(topic_id_type(),
1463 topic_id() | string(),
1464 dict:dict(string(), topic_id())) ->
1465 topic_id().
1466 get_topic_id(TopicIdType, TopicIdOrName, NameMap) ->
1467 15 case TopicIdType of
1468 ?PRE_DEF_TOPIC_ID ->
1469 3 TopicIdOrName;
1470 ?TOPIC_ID ->
1471
:-(
TopicIdOrName;
1472 ?SHORT_TOPIC_NAME ->
1473 12 dict:fetch(TopicIdOrName, NameMap)
1474 end.
1475
1476 -spec first_gw(string()) -> #gw_info{} | none.
1477 first_gw(Name) ->
1478
:-(
NameGW = list_to_atom(Name),
1479
:-(
NextKey = ets:first(NameGW),
1480
:-(
case NextKey of
1481 '$end_of_table' ->
1482
:-(
none;
1483 Key ->
1484
:-(
{_, GWInfo} = ets:lookup_element(NameGW, Key, 1),
1485
:-(
GWInfo
1486 end.
1487
1488 -spec next_gw(string(), gw_id()) -> #gw_info{} | none.
1489 next_gw(Name, GWId) ->
1490
:-(
NameGW = list_to_atom(Name),
1491
:-(
NextKey = ets:next(NameGW, GWId),
1492
:-(
FirstKey = ets:first(NameGW),
1493
:-(
case {NextKey, FirstKey} of
1494 {'$end_of_table', '$end_of_table'} ->
1495
:-(
none;
1496 {'$end_of_table', Key} ->
1497
:-(
{_, GWInfo} = ets:lookup_element(NameGW, Key, 1),
1498
:-(
GWInfo;
1499 {Key, _} ->
1500
:-(
{_, GWInfo} = ets:lookup_element(NameGW, Key, 1),
1501
:-(
GWInfo
1502 end.
Line Hits Source