1 |
|
%%------------------------------------------------------------------------- |
2 |
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%------------------------------------------------------------------------- |
16 |
|
|
17 |
|
%% @doc state machine handler for MQTT-SN client |
18 |
|
%% |
19 |
|
%% @private |
20 |
|
-module(emqttsn_state). |
21 |
|
-behavior(gen_statem). |
22 |
|
|
23 |
|
-include("emqttsn.hrl"). |
24 |
|
-include("logger.hrl"). |
25 |
|
-include_lib("stdlib/include/assert.hrl"). |
26 |
|
|
27 |
|
-export([init/1, callback_mode/0, start_link/2, handle_event/4]). |
28 |
|
|
29 |
|
|
30 |
|
|
31 |
|
-spec callback_mode() -> gen_statem:callback_mode_result(). |
32 |
|
callback_mode() -> |
33 |
36 |
[handle_event_function, state_enter]. |
34 |
|
|
35 |
|
-spec start_link(string(), config()) -> {ok, pid()} | {error, term()}. |
36 |
|
start_link(Name, Config) -> |
37 |
36 |
#config{send_port = Port} = Config, |
38 |
36 |
case gen_statem:start_link({global, Name}, ?MODULE, {Name, Port, Config}, []) of |
39 |
|
{'ok', Pid} -> |
40 |
36 |
{ok, Pid}; |
41 |
|
'ignore' -> |
42 |
:-( |
?LOGP(error, "gen_statem starting process returns ignore"), |
43 |
:-( |
{error, ignore}; |
44 |
|
{error, Reason} -> |
45 |
:-( |
?LOGP(error, "gen_statem starting process failed, reason: ~p", [Reason]), |
46 |
:-( |
{error, Reason} |
47 |
|
end. |
48 |
|
|
49 |
|
|
50 |
|
-spec init({string(), inet:port_number(), config()}) -> {ok, atom(), state()} | {stop, term()}. |
51 |
|
init({Name, Port, Config}) -> |
52 |
27 |
emqttsn_utils:init_global_store(Name), |
53 |
27 |
case emqttsn_udp:init_port(Port) of |
54 |
|
{error, Reason} -> |
55 |
:-( |
?LOGP(error, "port init failed, reason: ~p", [Reason]), |
56 |
:-( |
{stop, Reason}; |
57 |
|
{ok, Socket} -> |
58 |
27 |
{ok, initialized, #state{name = Name, socket = Socket, config = Config}} |
59 |
|
end. |
60 |
|
|
61 |
|
%------------------------------------------------------------------------------- |
62 |
|
% Client is disconnected or before connect |
63 |
|
%------------------------------------------------------------------------------- |
64 |
|
|
65 |
|
%%------------------------------------------------------------------------------ |
66 |
|
%% @doc Find the Host of service gateway |
67 |
|
%% |
68 |
|
%% state : [initialized] -> [found] |
69 |
|
%% trigger: enter state |
70 |
|
|
71 |
|
%% gen_statem for state machine |
72 |
|
%% @end |
73 |
|
%%------------------------------------------------------------------------------ |
74 |
|
|
75 |
|
-spec handle_event(atom(), term(), atom(), state()) -> gen_statem:event_handler_result(state()). |
76 |
|
handle_event(enter, _OldState, initialized, |
77 |
|
State = #state{config = Config, socket = Socket}) -> |
78 |
54 |
?LOG_STATE(debug, "Find the Host of service gateway", [], State), |
79 |
54 |
#config{search_gw_interval = Interval} = Config, |
80 |
54 |
case emqttsn_send:broadcast_searchgw(Config, Socket, ?DEFAULT_PORT, ?DEFAULT_RADIUS) of |
81 |
|
ok -> |
82 |
54 |
{keep_state, State, {state_timeout, Interval, {}}}; |
83 |
|
{error, Reason} -> |
84 |
:-( |
?LOG_STATE(warning, "Boardcast SearchGw failed: ~p", [Reason], State), |
85 |
:-( |
{keep_state, State, {state_timeout, Interval, {}}} |
86 |
|
end; |
87 |
|
|
88 |
|
%%------------------------------------------------------------------------------ |
89 |
|
%% @doc Resend searchgw when reach time interval T_SEARCHGW |
90 |
|
%% |
91 |
|
%% state : repeat [initialized] |
92 |
|
%% trigger: state timeout |
93 |
|
|
94 |
|
%% gen_statem for state machine |
95 |
|
%% T_SEARCHGW |
96 |
|
%% @end |
97 |
|
%%------------------------------------------------------------------------------ |
98 |
|
|
99 |
|
handle_event(state_timeout, {}, initialized, State = #state{config = Config}) -> |
100 |
3 |
#config{search_gw_interval = Interval} = Config, |
101 |
3 |
?LOG_STATE(debug, "Resend searchgw when reach time interval T_SEARCHGW ~p", [Interval], State), |
102 |
3 |
{repeat_state, State}; |
103 |
|
|
104 |
|
%%------------------------------------------------------------------------------ |
105 |
|
%% @doc Fetch gateway from received broadcast ADVERTISE packet |
106 |
|
%% |
107 |
|
%% state : keep [initialized] |
108 |
|
%% trigger: receive advertise packet |
109 |
|
%% |
110 |
|
%% gen_statem for state machine |
111 |
|
%% @end |
112 |
|
%%------------------------------------------------------------------------------ |
113 |
|
|
114 |
|
handle_event(cast, {?ADVERTISE_PACKET(GateWayId, _Duration), Host, Port}, |
115 |
|
_StateName, State = #state{name = Name}) -> |
116 |
1 |
?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from received broadcast ADVERTISE packet", |
117 |
:-( |
[GateWayId, Host, Port], State), |
118 |
1 |
emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = Host, |
119 |
|
port = Port, from = ?BROADCAST}), |
120 |
1 |
{keep_state, State}; |
121 |
|
|
122 |
|
%%------------------------------------------------------------------------------ |
123 |
|
%% @doc Fetch gateway from received broadcast GWINFO packet by gateway |
124 |
|
%% |
125 |
|
%% state : keep [initialized] |
126 |
|
%% trigger: receive gwinfo packet |
127 |
|
%% |
128 |
|
%% gen_statem for state machine |
129 |
|
%% @end |
130 |
|
%%------------------------------------------------------------------------------ |
131 |
|
|
132 |
|
handle_event(cast, {?GWINFO_PACKET(GateWayId), Host, Port}, |
133 |
|
_StateName, State = #state{name = Name}) -> |
134 |
2 |
?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from received broadcast GWINFO packet by gateway", |
135 |
:-( |
[GateWayId, Host, Port], State), |
136 |
2 |
emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = Host, |
137 |
|
port = Port, from = ?BROADCAST}), |
138 |
2 |
{keep_state, State}; |
139 |
|
|
140 |
|
%%------------------------------------------------------------------------------ |
141 |
|
%% @doc Fetch gateway from received broadcast GWINFO packet by other client |
142 |
|
%% |
143 |
|
%% state : keep [initialized] |
144 |
|
%% trigger: receive gwinfo packet |
145 |
|
%% |
146 |
|
%% gen_statem for state machine |
147 |
|
%% use DEFAULT_PORT = 1884, maybe have mistake |
148 |
|
%% @end |
149 |
|
%%------------------------------------------------------------------------------ |
150 |
|
|
151 |
|
handle_event(cast, ?GWINFO_PACKET(GateWayId, GateWayAdd), |
152 |
|
_StateName, State = #state{name = Name}) -> |
153 |
1 |
?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from received broadcast GWINFO packet by client", |
154 |
:-( |
[GateWayId, GateWayAdd, ?DEFAULT_PORT], State), |
155 |
1 |
emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = GateWayAdd, |
156 |
|
port = ?DEFAULT_PORT, |
157 |
|
from = ?PARAPHRASE}), |
158 |
1 |
{keep_state, State}; |
159 |
|
|
160 |
|
%%------------------------------------------------------------------------------ |
161 |
|
%% @doc Request to add a new gateway |
162 |
|
%% |
163 |
|
%% state : keep [initialized] |
164 |
|
%% trigger: manual call |
165 |
|
|
166 |
|
%% gen_statem for state machine |
167 |
|
%% @end |
168 |
|
%%------------------------------------------------------------------------------ |
169 |
|
|
170 |
|
handle_event(cast, {add_gw, Host, Port, GateWayId}, _StateName, |
171 |
|
State = #state{name = Name}) -> |
172 |
19 |
?LOG_STATE(notice, "Fetch gateway id ~p at ~p:~p from manual add", |
173 |
:-( |
[GateWayId, Host, Port], State), |
174 |
19 |
emqttsn_utils:store_gw(Name, #gw_info{id = GateWayId, host = Host, |
175 |
|
port = Port, from = ?MANUAL}), |
176 |
19 |
{keep_state, State}; |
177 |
|
|
178 |
|
%%------------------------------------------------------------------------------ |
179 |
|
%% @doc Request to connect a exist gateway |
180 |
|
%% |
181 |
|
%% state : [initialized] -> [found] |
182 |
|
%% trigger: manual call |
183 |
|
|
184 |
|
%% gen_statem for state machine |
185 |
|
%% @end |
186 |
|
%%------------------------------------------------------------------------------ |
187 |
|
|
188 |
|
handle_event(cast, {connect, GateWayId}, initialized, |
189 |
|
State = #state{name = Name, socket = Socket}) -> |
190 |
19 |
?LOG_STATE(debug, "Request to connect a exist gateway id ~p", |
191 |
19 |
[GateWayId], State), |
192 |
19 |
case emqttsn_utils:get_gw(Name, GateWayId) of |
193 |
|
none -> |
194 |
:-( |
{keep_state, State}; |
195 |
|
#gw_info{host = Host, port = Port} -> |
196 |
19 |
emqttsn_udp:connect(Socket, Host, Port), |
197 |
19 |
{next_state, found, |
198 |
|
State#state{socket = Socket, active_gw = |
199 |
|
#gw_collect{id = GateWayId, host = Host, port = Port}}} |
200 |
|
end; |
201 |
|
|
202 |
|
%------------------------------------------------------------------------------- |
203 |
|
% Client connecting process |
204 |
|
%------------------------------------------------------------------------------- |
205 |
|
|
206 |
|
%%------------------------------------------------------------------------------ |
207 |
|
%% @doc Connect to service gateway |
208 |
|
%% |
209 |
|
%% state : keep [found] |
210 |
|
%% trigger: enter state |
211 |
|
|
212 |
|
%% gen_statem for state machine |
213 |
|
%% @end |
214 |
|
%%------------------------------------------------------------------------------ |
215 |
|
|
216 |
|
handle_event(enter, _OldState, found, |
217 |
|
State = #state{config = Config, socket = Socket}) -> |
218 |
47 |
?LOG_STATE(debug, "Connect to service gateway", [], State), |
219 |
47 |
#config{will = Will, clean_session = CleanSession, |
220 |
|
duration = Duration, client_id = ClientId, |
221 |
|
ack_timeout = AckTimeout} = Config, |
222 |
47 |
emqttsn_send:send_connect(Config, Socket, Will, CleanSession, Duration, ClientId), |
223 |
47 |
{keep_state, State, {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}; |
224 |
|
|
225 |
|
%%------------------------------------------------------------------------------ |
226 |
|
%% @doc Found timeout to receive gateway response |
227 |
|
%% |
228 |
|
%% state : repeat [found] |
229 |
|
%% trigger: state timeout + can resend |
230 |
|
%% |
231 |
|
%% state : [found] -> [connect_other] |
232 |
|
%% trigger: state timeout + cannot resend |
233 |
|
%% |
234 |
|
%% gen_statem for state machine |
235 |
|
%% @end |
236 |
|
%%------------------------------------------------------------------------------ |
237 |
|
|
238 |
|
handle_event(state_timeout, {ResendTimes}, found, |
239 |
|
State = #state{config = Config}) -> |
240 |
17 |
#config{max_resend = MaxResend} = Config, |
241 |
17 |
?LOG_STATE(warning, "Found timeout to receive gateway response, retry: ~p/~p", |
242 |
:-( |
[ResendTimes], State), |
243 |
|
|
244 |
17 |
if |
245 |
|
ResendTimes < MaxResend -> |
246 |
17 |
{repeat_state, State}; |
247 |
|
ResendTimes >= MaxResend -> |
248 |
:-( |
{next_state, connect_other, State} |
249 |
|
end; |
250 |
|
|
251 |
|
%%------------------------------------------------------------------------------ |
252 |
|
%% @doc Automatically answer for will_topic request |
253 |
|
%% |
254 |
|
%% state : keep [found] |
255 |
|
%% trigger: receive will_topic_req packet |
256 |
|
|
257 |
|
%% gen_statem for state machine |
258 |
|
%% @end |
259 |
|
%%------------------------------------------------------------------------------ |
260 |
|
|
261 |
|
handle_event(cast, ?WILLTOPICREQ_PACKET(), found, |
262 |
|
State = #state{config = Config, socket = Socket}) -> |
263 |
1 |
?LOG_STATE(debug, "Automatically answer for will_topic request", |
264 |
1 |
[], State), |
265 |
1 |
#config{will_qos = Qos, will_topic = WillTopic} = Config, |
266 |
1 |
Retain = false, |
267 |
1 |
emqttsn_send:send_willtopic(Config, Socket, Qos, Retain, WillTopic), |
268 |
1 |
{keep_state, State, {state_timeout, update, connect_ack}}; |
269 |
|
|
270 |
|
%%------------------------------------------------------------------------------ |
271 |
|
%% @doc Automatically answer for will_msg request |
272 |
|
%% |
273 |
|
%% state : keep [found] |
274 |
|
%% trigger: receive will_msg_req packet |
275 |
|
|
276 |
|
%% gen_statem for state machine |
277 |
|
%% @end |
278 |
|
%%------------------------------------------------------------------------------ |
279 |
|
|
280 |
|
handle_event(cast, ?WILLMSGREQ_PACKET(), found, |
281 |
|
State = #state{config = Config, socket = Socket}) -> |
282 |
1 |
?LOG_STATE(debug, "Automatically answer for will_msg request", |
283 |
1 |
[], State), |
284 |
1 |
#config{will_msg = WillMsg} = Config, |
285 |
1 |
emqttsn_send:send_willmsg(Config, Socket, WillMsg), |
286 |
1 |
{keep_state, State, {state_timeout, update, connect_ack}}; |
287 |
|
|
288 |
|
%%------------------------------------------------------------------------------ |
289 |
|
%% @doc Gateway ensure connection is established |
290 |
|
%% |
291 |
|
%% state : [found] -> [connected] |
292 |
|
%% trigger: receive connack packet and return code success |
293 |
|
|
294 |
|
%% gen_statem for state machine |
295 |
|
%% @end |
296 |
|
%%------------------------------------------------------------------------------ |
297 |
|
|
298 |
|
handle_event(cast, ?CONNACK_PACKET(ReturnCode), found, |
299 |
|
State = #state{config = Config}) |
300 |
|
when ReturnCode == ?RC_ACCEPTED -> |
301 |
16 |
?LOG_STATE(debug, "Gateway ensure connection is established", [], State), |
302 |
16 |
#config{keep_alive = PingInterval} = Config, |
303 |
16 |
{next_state, connected, State#state{gw_failed_cycle = 0}, |
304 |
|
{state_timeout, PingInterval, ping}}; |
305 |
|
|
306 |
|
%%------------------------------------------------------------------------------ |
307 |
|
%% @doc Connection is failed to establish |
308 |
|
%% |
309 |
|
%% state : keep [found] |
310 |
|
%% trigger: receive connack packet and return code failed |
311 |
|
|
312 |
|
%% gen_statem for state machine |
313 |
|
%% @end |
314 |
|
%%------------------------------------------------------------------------------ |
315 |
|
|
316 |
|
handle_event(cast, ?CONNACK_PACKET(ReturnCode), found, |
317 |
|
State = #state{name = Name, socket = Socket, config = Config}) -> |
318 |
1 |
?LOG_STATE(error, "failed for connect response, return code: ~p", |
319 |
:-( |
[ReturnCode], State), |
320 |
1 |
{next_state, initialized, #state{name = Name, socket = Socket, config = Config}}; |
321 |
|
|
322 |
|
%------------------------------------------------------------------------------- |
323 |
|
% Client is waiting for response with qos resend |
324 |
|
%------------------------------------------------------------------------------- |
325 |
|
|
326 |
|
%%------------------------------------------------------------------------------ |
327 |
|
%% @doc Finish register request and back to connected |
328 |
|
%% |
329 |
|
%% state : [wait_reg] -> [connected] |
330 |
|
%% trigger: receive regack packet |
331 |
|
|
332 |
|
%% gen_statem for state machine |
333 |
|
%% @end |
334 |
|
%%------------------------------------------------------------------------------ |
335 |
|
|
336 |
|
handle_event(cast, ?REGACK_PACKET(TopicId, RemotePacketId, ReturnCode), wait_reg, |
337 |
|
State = #state{next_packet_id = LocalPacketId, |
338 |
|
waiting_data = {reg, TopicName}, |
339 |
|
topic_id_name = IdMap, topic_name_id = NameMap}) |
340 |
|
when RemotePacketId == LocalPacketId -> |
341 |
|
|
342 |
6 |
case ReturnCode of |
343 |
|
?RC_ACCEPTED -> |
344 |
5 |
?LOG_STATE(debug, "Finish register topic id ~p and back to connected, packet id ~p", |
345 |
5 |
[TopicId, RemotePacketId], State), |
346 |
5 |
NewIdMap = dict:store(TopicId, TopicName, IdMap), |
347 |
5 |
NewNameMap = dict:store(TopicName, TopicId, NameMap), |
348 |
5 |
{next_state, connected, |
349 |
|
State#state{next_packet_id = next_packet_id(RemotePacketId), |
350 |
|
waiting_data = {}, topic_id_name = NewIdMap, |
351 |
|
topic_name_id = NewNameMap}}; |
352 |
|
_ -> |
353 |
1 |
?LOG_STATE(error, "failed for register response, return code: ~p", |
354 |
:-( |
[ReturnCode], State), |
355 |
1 |
{next_state, connected, |
356 |
|
State#state{next_packet_id = next_packet_id(RemotePacketId), |
357 |
|
waiting_data = {}, topic_id_name = IdMap, |
358 |
|
topic_name_id = NameMap}} |
359 |
|
end; |
360 |
|
|
361 |
|
|
362 |
|
%%------------------------------------------------------------------------------ |
363 |
|
%% @doc Answer for register request is timeout and retry register |
364 |
|
%% |
365 |
|
%% state : keep [wait_reg] |
366 |
|
%% trigger: state timeout + can resend |
367 |
|
%% |
368 |
|
%% state : [wait_reg] -> [connected] |
369 |
|
%% trigger: state timeout + cannot resend |
370 |
|
%% |
371 |
|
%% gen_statem for state machine |
372 |
|
%% @end |
373 |
|
%%------------------------------------------------------------------------------ |
374 |
|
|
375 |
|
handle_event(state_timeout, {ResendTimes}, wait_reg, |
376 |
|
State = #state{next_packet_id = PacketId, config = Config, |
377 |
|
waiting_data = {reg, TopicName}, socket = Socket}) -> |
378 |
6 |
#config{max_resend = MaxResend, resend_no_qos = WhetherResend, |
379 |
|
ack_timeout = AckTimeout} = Config, |
380 |
6 |
?LOG_STATE(debug, "Answer for register topic name ~p is timeout |
381 |
|
and retry register, packet id: ~p, retry: ~p/~p", |
382 |
6 |
[TopicName, PacketId, ResendTimes, MaxResend], State), |
383 |
|
|
384 |
6 |
if |
385 |
|
WhetherResend andalso ResendTimes < MaxResend -> |
386 |
5 |
emqttsn_send:send_register(Config, Socket, TopicName, PacketId), |
387 |
5 |
{keep_state, State#state{next_packet_id = PacketId, |
388 |
|
waiting_data = {reg, TopicName}}, |
389 |
|
{state_timeout, AckTimeout, {ResendTimes + 1}}}; |
390 |
|
not WhetherResend orelse ResendTimes >= MaxResend -> |
391 |
1 |
{next_state, connected, State} |
392 |
|
end; |
393 |
|
|
394 |
|
%%------------------------------------------------------------------------------ |
395 |
|
%% @doc Finish subscribe request and back to connected |
396 |
|
%% |
397 |
|
%% state : [wait_sub] -> [connected] |
398 |
|
%% trigger: receive suback packet |
399 |
|
%% |
400 |
|
%% gen_statem for state machine |
401 |
|
%% @end |
402 |
|
%%------------------------------------------------------------------------------ |
403 |
|
|
404 |
|
handle_event(cast, |
405 |
|
?SUBACK_PACKET(GrantQos, RemoteTopicId, RemotePacketId, ReturnCode), |
406 |
|
wait_sub, |
407 |
|
State = #state{next_packet_id = LocalPacketId, topic_id_name = IdMap, |
408 |
|
topic_name_id = NameMap, topic_id_use_qos = QosMap, |
409 |
|
config = Config, |
410 |
|
waiting_data = {sub, TopicIdType, TopicIdOrName, _MaxQos}}) |
411 |
|
when RemotePacketId == LocalPacketId -> |
412 |
8 |
?LOG_STATE(debug, "Finish subscribe request and back to connected, |
413 |
|
packet: ~p, topic: ~p, return_code: ~p, qos: ~p", |
414 |
8 |
[RemotePacketId, TopicIdOrName, ReturnCode, GrantQos], State), |
415 |
8 |
#config{recv_qos = LocalQos} = Config, |
416 |
8 |
case ReturnCode of |
417 |
|
?RC_ACCEPTED -> |
418 |
7 |
Qos = min(GrantQos, LocalQos), |
419 |
7 |
NewQosMap = dict:store(RemoteTopicId, Qos, QosMap), |
420 |
7 |
case TopicIdType of |
421 |
|
?SHORT_TOPIC_NAME -> |
422 |
5 |
NewIdMap = dict:store(RemoteTopicId, TopicIdOrName, IdMap), |
423 |
5 |
NewNameMap = dict:store(TopicIdOrName, RemoteTopicId, NameMap), |
424 |
5 |
{next_state, connected, |
425 |
|
State#state{next_packet_id = next_packet_id(LocalPacketId), |
426 |
|
waiting_data = {}, topic_id_name = NewIdMap, |
427 |
|
topic_name_id = NewNameMap, topic_id_use_qos = NewQosMap}}; |
428 |
|
_ -> |
429 |
2 |
?assertEqual(RemoteTopicId, TopicIdOrName), |
430 |
2 |
{next_state, connected, |
431 |
|
State#state{next_packet_id = next_packet_id(LocalPacketId), |
432 |
|
waiting_data = {}, topic_id_use_qos = NewQosMap}} |
433 |
|
end; |
434 |
|
_ -> |
435 |
1 |
?LOG_STATE(error, "failed for subscribe response, return_code: ~p", |
436 |
:-( |
[ReturnCode], State), |
437 |
1 |
{next_state, connected, |
438 |
|
State#state{next_packet_id = next_packet_id(LocalPacketId), |
439 |
|
waiting_data = {}}} |
440 |
|
end; |
441 |
|
|
442 |
|
%%------------------------------------------------------------------------------ |
443 |
|
%% @doc Answer for subscribe request is timeout and retry subscribe |
444 |
|
%% |
445 |
|
%% state : keep [wait_sub] |
446 |
|
%% trigger: state timeout + can resend |
447 |
|
%% |
448 |
|
%% state : [wait_sub] -> [connected] |
449 |
|
%% trigger: state timeout + cannot resend |
450 |
|
%% |
451 |
|
%% gen_statem for state machine |
452 |
|
%% @end |
453 |
|
%%------------------------------------------------------------------------------ |
454 |
|
|
455 |
|
handle_event(state_timeout, {ResendTimes}, wait_sub, |
456 |
|
State = |
457 |
|
#state{next_packet_id = PacketId, config = Config, socket = Socket, |
458 |
|
waiting_data = {sub, TopicIdType, TopicIdOrName, MaxQos}}) -> |
459 |
6 |
#config{max_resend = MaxResend, resend_no_qos = WhetherResend, |
460 |
|
ack_timeout = AckTimeout} = Config, |
461 |
6 |
?LOG_STATE(warning, "Answer for subscribe request is timeout and retry subscribe, retry: ~p/~p", |
462 |
:-( |
[ResendTimes, MaxResend], State), |
463 |
6 |
if |
464 |
|
WhetherResend andalso ResendTimes < MaxResend -> |
465 |
5 |
emqttsn_send:send_subscribe(Config, Socket, true, TopicIdType, |
466 |
|
TopicIdOrName, MaxQos, PacketId), |
467 |
5 |
{keep_state, |
468 |
|
State#state{next_packet_id = PacketId, |
469 |
|
waiting_data = {sub, TopicIdType, TopicIdOrName, MaxQos}}, |
470 |
|
{state_timeout, AckTimeout, {ResendTimes + 1}}}; |
471 |
|
not WhetherResend orelse ResendTimes >= MaxResend -> |
472 |
1 |
{next_state, connected, State} |
473 |
|
end; |
474 |
|
|
475 |
|
%%------------------------------------------------------------------------------ |
476 |
|
%% @doc Finish unsubscribe request and back to connected |
477 |
|
%% |
478 |
|
%% state : [wait_unsub] -> [connected] |
479 |
|
%% trigger: receive unsuback packet |
480 |
|
%% |
481 |
|
%% gen_statem for state machine |
482 |
|
%% @end |
483 |
|
%%------------------------------------------------------------------------------ |
484 |
|
|
485 |
|
handle_event(cast, |
486 |
|
?UNSUBACK_PACKET(RemotePacketId), |
487 |
|
wait_unsub, |
488 |
|
State = #state{next_packet_id = LocalPacketId, |
489 |
|
waiting_data = {unsub, TopicIdType, TopicIdOrName}}) |
490 |
|
when RemotePacketId == LocalPacketId -> |
491 |
1 |
?LOG_STATE(debug, "Finish unsubscribe request and back to connected, |
492 |
1 |
packet: ~p, topic type: ~p, topic: ~p", [RemotePacketId, TopicIdType, TopicIdOrName], State), |
493 |
1 |
{next_state, connected, State#state{next_packet_id = next_packet_id(LocalPacketId), |
494 |
|
waiting_data = {}}}; |
495 |
|
|
496 |
|
%%------------------------------------------------------------------------------ |
497 |
|
%% @doc Answer for unsubscribe request is timeout and retry unsubscribe |
498 |
|
%% |
499 |
|
%% state : keep [wait_unsub] |
500 |
|
%% trigger: state timeout + can resend |
501 |
|
%% |
502 |
|
%% state : [wait_unsub] -> [connected] |
503 |
|
%% trigger: state timeout + cannot resend |
504 |
|
%% |
505 |
|
%% gen_statem for state machine |
506 |
|
%% @end |
507 |
|
%%------------------------------------------------------------------------------ |
508 |
|
|
509 |
|
handle_event(state_timeout, {ResendTimes}, wait_unsub, State = |
510 |
|
#state{next_packet_id = PacketId, config = Config, socket = Socket, |
511 |
|
waiting_data = {unsub, TopicIdType, TopicIdOrName}}) -> |
512 |
6 |
#config{max_resend = MaxResend, resend_no_qos = WhetherResend, |
513 |
|
ack_timeout = AckTimeout} = Config, |
514 |
6 |
?LOG_STATE(warning, "Answer for unsubscribe request is timeout and retry subscribe, retry: ~p/~p", |
515 |
:-( |
[ResendTimes, MaxResend], State), |
516 |
6 |
if |
517 |
|
WhetherResend andalso ResendTimes < MaxResend -> |
518 |
5 |
emqttsn_send:send_unsubscribe(Config, Socket, TopicIdType, TopicIdOrName, PacketId), |
519 |
5 |
{keep_state, |
520 |
|
State#state{next_packet_id = PacketId, |
521 |
|
waiting_data = {unsub, TopicIdType, TopicIdOrName}}, |
522 |
|
{state_timeout, AckTimeout, {ResendTimes + 1}}}; |
523 |
|
not WhetherResend orelse ResendTimes >= MaxResend -> |
524 |
1 |
{next_state, connected, State} |
525 |
|
end; |
526 |
|
%%------------------------------------------------------------------------------ |
527 |
|
%% @doc Finish publish request and back to connected when at QoS 1 |
528 |
|
%% |
529 |
|
%% state : [wait_pub_qos1] -> [connected] |
530 |
|
%% trigger: receive puback packet |
531 |
|
%% |
532 |
|
%% gen_statem for state machine |
533 |
|
%% QoS 1 |
534 |
|
%% @end |
535 |
|
%%------------------------------------------------------------------------------ |
536 |
|
|
537 |
|
handle_event(cast, ?PUBACK_PACKET(RemoteTopicId, RemotePacketId, ReturnCode), |
538 |
|
wait_pub_qos1, |
539 |
|
State = #state{next_packet_id = LocalPacketId, |
540 |
|
config = Config, topic_id_name = Map, |
541 |
|
waiting_data = {pub, ?QOS_1, TopicIdType, |
542 |
|
LocalTopicIdOrName, Message}}) |
543 |
|
when RemotePacketId == LocalPacketId -> |
544 |
2 |
?LOG_STATE(debug, "Finish publish request and back to connected |
545 |
|
when at QoS 1, packet: ~p, return_code: ~p", |
546 |
2 |
[RemotePacketId, ReturnCode], State), |
547 |
2 |
#config{max_message_each_topic = TopicMaxMsg} = Config, |
548 |
2 |
NewMap = case TopicIdType of |
549 |
|
?PRE_DEF_TOPIC_ID -> |
550 |
:-( |
?assertEqual(RemoteTopicId, LocalTopicIdOrName), |
551 |
:-( |
Map; |
552 |
|
?TOPIC_ID -> |
553 |
:-( |
?assertEqual(RemoteTopicId, LocalTopicIdOrName), |
554 |
:-( |
Map; |
555 |
|
?SHORT_TOPIC_NAME -> |
556 |
2 |
dict:store(RemoteTopicId, LocalTopicIdOrName, Map) |
557 |
|
end, |
558 |
2 |
if |
559 |
|
ReturnCode =/= ?RC_ACCEPTED |
560 |
|
-> |
561 |
1 |
?LOG_STATE(error, "Failed for publish response, return code: ~p", |
562 |
:-( |
[ReturnCode], State), |
563 |
1 |
{next_state, connected, |
564 |
|
State#state{next_packet_id = next_packet_id(LocalPacketId), |
565 |
|
topic_id_name = NewMap}}; |
566 |
|
ReturnCode =:= ?RC_ACCEPTED |
567 |
|
-> |
568 |
1 |
NewState = emqttsn_utils:store_msg(State, RemoteTopicId, TopicMaxMsg, Message), |
569 |
1 |
{next_state, connected, |
570 |
|
NewState#state{next_packet_id = next_packet_id(LocalPacketId), |
571 |
|
topic_id_name = NewMap}} |
572 |
|
end; |
573 |
|
|
574 |
|
|
575 |
|
%%------------------------------------------------------------------------------ |
576 |
|
%% @doc Answer for publish request is timeout and retry publish at QoS 1 |
577 |
|
%% |
578 |
|
%% state : keep [wait_pub_qos1] |
579 |
|
%% trigger: state timeout + can resend |
580 |
|
%% |
581 |
|
%% state : [wait_pub_qos1] -> [connected] |
582 |
|
%% trigger: state timeout + cannot resend |
583 |
|
%% |
584 |
|
%% gen_statem for state machine |
585 |
|
%% QoS 1 |
586 |
|
%% @end |
587 |
|
%%------------------------------------------------------------------------------ |
588 |
|
|
589 |
|
handle_event(state_timeout, {Retain, ResendTimes}, wait_pub_qos1, |
590 |
|
State = #state{next_packet_id = PacketId, config = Config, |
591 |
|
socket = Socket, |
592 |
|
waiting_data = {pub, ?QOS_1, TopicIdType, |
593 |
|
TopicIdOrName, Message}}) -> |
594 |
6 |
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config, |
595 |
6 |
?LOG_STATE(warning, "Answer for publish request is timeout |
596 |
|
and retry publish at QoS 1, retain: ~p, retry: ~p/~p", |
597 |
:-( |
[Retain, ResendTimes, MaxResend], State), |
598 |
|
|
599 |
6 |
if |
600 |
|
ResendTimes < MaxResend -> |
601 |
5 |
emqttsn_send:send_publish(Config, Socket, ?QOS_1, ?DUP_TRUE, Retain, TopicIdType, |
602 |
|
TopicIdOrName, Message, PacketId), |
603 |
5 |
{keep_state, State#state{next_packet_id = PacketId, |
604 |
|
waiting_data = {pub, ?QOS_1, TopicIdType, |
605 |
|
TopicIdOrName, Message}}, |
606 |
|
{state_timeout, AckTimeout, {Retain, ResendTimes + 1}}}; |
607 |
|
ResendTimes >= MaxResend -> |
608 |
1 |
{next_state, connected, State#state{waiting_data = {}}} |
609 |
|
end; |
610 |
|
|
611 |
|
%%------------------------------------------------------------------------------ |
612 |
|
%% @doc Continue publish request part 2 - receive pubrec and send pubrel |
613 |
|
%% and then transfer to wait pubrel packet when at QoS 2 |
614 |
|
%% |
615 |
|
%% state : [wait_pub_qos1] -> [wait_pubrel_qos2] |
616 |
|
%% trigger: receive puback packet |
617 |
|
%% |
618 |
|
%% gen_statem for state machine |
619 |
|
%% QoS 2 |
620 |
|
%% @end |
621 |
|
%%------------------------------------------------------------------------------ |
622 |
|
|
623 |
|
handle_event(cast, ?PUBREC_PACKET(RemotePacketId), wait_pub_qos2, |
624 |
|
State = #state{next_packet_id = LocalPacketId, config = Config, |
625 |
|
topic_name_id = NameMap, socket = Socket, |
626 |
|
waiting_data = {pub, ?QOS_2, TopicIdType, |
627 |
|
TopicIdOrName, Message}}) |
628 |
|
when RemotePacketId == LocalPacketId -> |
629 |
1 |
?LOG_STATE(debug, "Continue publish request part 2, packet id: ~p", |
630 |
1 |
[RemotePacketId], State), |
631 |
1 |
TopicId = get_topic_id(TopicIdType, TopicIdOrName, NameMap), |
632 |
1 |
#config{ack_timeout = AckTimeout, |
633 |
|
max_message_each_topic = TopicMaxMsg} = Config, |
634 |
1 |
NewState = emqttsn_utils:store_msg(State, TopicId, TopicMaxMsg, Message), |
635 |
1 |
emqttsn_send:send_pubrel(Config, Socket, RemotePacketId), |
636 |
1 |
{next_state, wait_pubrel_qos2, |
637 |
|
NewState#state{next_packet_id = next_packet_id(RemotePacketId), |
638 |
|
waiting_data = {pubrel, ?QOS_2}}, |
639 |
|
{state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}; |
640 |
|
|
641 |
|
%%------------------------------------------------------------------------------ |
642 |
|
%% @doc Answer for publish request is timeout at part 2 - receive pubrec |
643 |
|
%% and then retry publish at QoS 2 |
644 |
|
%% |
645 |
|
%% state : keep [wait_pub_qos2] |
646 |
|
%% trigger: state timeout + can resend |
647 |
|
%% |
648 |
|
%% state : [wait_pub_qos1] -> [connected] |
649 |
|
%% trigger: state timeout + cannot resend |
650 |
|
%% |
651 |
|
%% gen_statem for state machine |
652 |
|
%% QoS 2 |
653 |
|
%% @end |
654 |
|
%%------------------------------------------------------------------------------ |
655 |
|
|
656 |
|
handle_event(state_timeout, {Retain, ResendTimes}, wait_pub_qos2, |
657 |
|
State = #state{next_packet_id = PacketId, |
658 |
|
config = Config, socket = Socket, |
659 |
|
waiting_data = {pub, ?QOS_2, TopicIdType, |
660 |
|
TopicIdOrName, Message}}) -> |
661 |
:-( |
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config, |
662 |
:-( |
?LOG_STATE(warning, "Answer for publish request is timeout |
663 |
|
at part 2, retain: ~p, retry: ~p/~p", |
664 |
:-( |
[Retain, ResendTimes, MaxResend], State), |
665 |
:-( |
if |
666 |
|
ResendTimes < MaxResend -> |
667 |
:-( |
emqttsn_send:send_publish(Config, Socket, ?QOS_2, ?DUP_TRUE, Retain, |
668 |
|
TopicIdType, TopicIdOrName, Message, PacketId), |
669 |
:-( |
{keep_state, |
670 |
|
State#state{next_packet_id = PacketId, |
671 |
|
waiting_data = {pub, ?QOS_2, TopicIdType, |
672 |
|
TopicIdOrName, Message}}, |
673 |
|
{state_timeout, AckTimeout, {Retain, ResendTimes + 1}}}; |
674 |
|
ResendTimes >= MaxResend -> |
675 |
:-( |
{next_state, connected, State#state{waiting_data = {}}} |
676 |
|
end; |
677 |
|
|
678 |
|
%%------------------------------------------------------------------------------ |
679 |
|
%% @doc Finish publish request part 3 - receive pubcomp |
680 |
|
%% and then back to connected when at QoS 2 |
681 |
|
%% |
682 |
|
%% state : [wait_pub_qos1] -> [wait_pubrel_qos2] |
683 |
|
%% trigger: receive puback packet |
684 |
|
%% |
685 |
|
%% gen_statem for state machine |
686 |
|
%% QoS 2 |
687 |
|
%% @end |
688 |
|
%%------------------------------------------------------------------------------ |
689 |
|
|
690 |
|
handle_event(cast, ?PUBCOMP_PACKET(RemotePacketId), wait_pubrel_qos2, |
691 |
|
State = #state{waiting_data = {pubrel, ?QOS_2}}) -> |
692 |
1 |
?LOG_STATE(debug, "Finish publish request part 3, packet id: ~p", |
693 |
1 |
[RemotePacketId], State), |
694 |
1 |
{next_state, connected, |
695 |
|
State#state{next_packet_id = next_packet_id(RemotePacketId), |
696 |
|
waiting_data = {}}}; |
697 |
|
|
698 |
|
%%------------------------------------------------------------------------------ |
699 |
|
%% @doc Answer for publish request is timeout at part 3 - receive pubcomp |
700 |
|
%% and then retry pubrel at QoS 2 |
701 |
|
%% |
702 |
|
%% state : keep [wait_pubrel_qos2] |
703 |
|
%% trigger: state timeout + can resend |
704 |
|
%% |
705 |
|
%% state : [wait_pubrel_qos2] -> [connected] |
706 |
|
%% trigger: state timeout + cannot resend |
707 |
|
%% |
708 |
|
%% gen_statem for state machine |
709 |
|
%% QoS 2 |
710 |
|
%% @end |
711 |
|
%%------------------------------------------------------------------------------ |
712 |
|
|
713 |
|
handle_event(state_timeout, {ResendTimes}, wait_pubrel_qos2, |
714 |
|
State = #state{next_packet_id = PacketId, |
715 |
|
config = Config, socket = Socket, |
716 |
|
waiting_data = {pubrel, ?QOS_2}}) -> |
717 |
:-( |
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config, |
718 |
:-( |
?LOG_STATE(warning, "Answer for publish request is timeout at part 3, retry: ~p/~p", |
719 |
:-( |
[ResendTimes, MaxResend], State), |
720 |
|
|
721 |
:-( |
if |
722 |
|
ResendTimes < MaxResend -> |
723 |
:-( |
emqttsn_send:send_pubrec(Config, Socket, PacketId), |
724 |
:-( |
{keep_state, |
725 |
|
State#state{next_packet_id = PacketId, |
726 |
|
waiting_data = {pubrel, ?QOS_2}}, |
727 |
|
{state_timeout, AckTimeout, {ResendTimes + 1}}}; |
728 |
|
ResendTimes >= MaxResend -> |
729 |
:-( |
{next_state, connected, State#state{waiting_data = {}}} |
730 |
|
end; |
731 |
|
|
732 |
|
%%------------------------------------------------------------------------------ |
733 |
|
%% @doc Finish receive publish part 2 - receive pubrel and send pubcomp |
734 |
|
%% and then back to connected when at QoS 2 |
735 |
|
%% |
736 |
|
%% state : [wait_pubrec_qos2] -> [connected] |
737 |
|
%% trigger: receive pubrel packet |
738 |
|
%% |
739 |
|
%% gen_statem for state machine |
740 |
|
%% QoS 2 |
741 |
|
%% @end |
742 |
|
%%------------------------------------------------------------------------------ |
743 |
|
|
744 |
|
handle_event(cast, ?PUBREL_PACKET(RemotePacketId), wait_pubrec_qos2, |
745 |
|
State = #state{socket = Socket, config = Config, |
746 |
|
next_packet_id = LocalPacketId}) |
747 |
|
when RemotePacketId == LocalPacketId -> |
748 |
:-( |
?LOG_STATE(debug, "Finish receive publish part 2, packet id: ~p", |
749 |
:-( |
[RemotePacketId], State), |
750 |
:-( |
emqttsn_send:send_pubcomp(Config, Socket, RemotePacketId), |
751 |
:-( |
{next_state, connected, |
752 |
|
State#state{next_packet_id = next_packet_id(RemotePacketId)}}; |
753 |
|
|
754 |
|
%%------------------------------------------------------------------------------ |
755 |
|
%% @doc Answer for receive publish is timeout at part 2 - receive pubrel |
756 |
|
%% and then retry pubrec at QoS 2 |
757 |
|
%% |
758 |
|
%% state : keep [wait_pubrec_qos2] |
759 |
|
%% trigger: state timeout + can resend |
760 |
|
%% |
761 |
|
%% state : [wait_pubrec_qos2] -> [connected] |
762 |
|
%% trigger: state timeout + cannot resend |
763 |
|
%% |
764 |
|
%% gen_statem for state machine |
765 |
|
%% QoS 2 |
766 |
|
%% @end |
767 |
|
%%------------------------------------------------------------------------------ |
768 |
|
|
769 |
|
handle_event(state_timeout, {ResendTimes}, wait_pubrec_qos2, |
770 |
|
State = #state{next_packet_id = PacketId, |
771 |
|
config = Config, socket = Socket, |
772 |
|
waiting_data = {FromStateName}}) -> |
773 |
:-( |
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config, |
774 |
:-( |
?LOG_STATE(warning, "Answer for receive publish is timeout at part 2, retry: ~p/~p", |
775 |
:-( |
[ResendTimes, MaxResend], State), |
776 |
|
|
777 |
:-( |
if |
778 |
|
ResendTimes < MaxResend -> |
779 |
:-( |
emqttsn_send:send_pubrec(Config, Socket, PacketId), |
780 |
:-( |
{keep_state, State#state{next_packet_id = PacketId}, |
781 |
|
{state_timeout, AckTimeout, {ResendTimes + 1}}}; |
782 |
|
ResendTimes >= MaxResend -> |
783 |
:-( |
{next_state, FromStateName, State#state{waiting_data = {}}} |
784 |
|
end; |
785 |
|
|
786 |
|
%%------------------------------------------------------------------------------ |
787 |
|
%% @doc Finish ping request and back to connected |
788 |
|
%% |
789 |
|
%% state : [wait_pingreq] -> [connected] |
790 |
|
%% trigger: receive pingresp packet |
791 |
|
%% |
792 |
|
%% gen_statem for state machine |
793 |
|
%% @end |
794 |
|
%%------------------------------------------------------------------------------ |
795 |
|
|
796 |
|
handle_event(cast, ?PINGRESP_PACKET(), wait_pingreq, State) -> |
797 |
:-( |
?LOG_STATE(debug, "Finish ping request and back to connected", [], State), |
798 |
:-( |
{next_state, connected, State}; |
799 |
|
|
800 |
|
%%------------------------------------------------------------------------------ |
801 |
|
%% @doc Answer for ping request is timeout and retry pingreq |
802 |
|
%% |
803 |
|
%% state : keep [wait_pingreq] |
804 |
|
%% trigger: state timeout + can resend |
805 |
|
%% |
806 |
|
%% state : [wait_pingreq] -> [connected] |
807 |
|
%% trigger: state timeout + cannot resend |
808 |
|
%% |
809 |
|
%% gen_statem for state machine |
810 |
|
%% @end |
811 |
|
%%------------------------------------------------------------------------------ |
812 |
|
|
813 |
|
handle_event(state_timeout, {ResendTimes}, wait_pingreq, |
814 |
|
State = #state{config = Config, socket = Socket}) -> |
815 |
:-( |
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config, |
816 |
:-( |
?LOG_STATE(warning, "Answer for ping request is timeout and retry pingreq, retry: ~p/~p", |
817 |
:-( |
[ResendTimes, MaxResend], State), |
818 |
:-( |
if |
819 |
|
ResendTimes < MaxResend -> |
820 |
:-( |
emqttsn_send:send_pingreq(Config, Socket), |
821 |
:-( |
{keep_state, State, {state_timeout, AckTimeout, {ResendTimes + 1}}}; |
822 |
|
ResendTimes >= MaxResend -> |
823 |
:-( |
{next_state, connect_other, State#state{waiting_data = {}}} |
824 |
|
end; |
825 |
|
|
826 |
|
%%------------------------------------------------------------------------------ |
827 |
|
%% @doc Finish asleep request and go to asleep |
828 |
|
%% |
829 |
|
%% state : [wait_asleep] -> [asleep] |
830 |
|
%% trigger: receive pingresp packet |
831 |
|
%% |
832 |
|
%% gen_statem for state machine |
833 |
|
%% @end |
834 |
|
%%------------------------------------------------------------------------------ |
835 |
|
|
836 |
|
handle_event(cast, ?DISCONNECT_PACKET(), wait_asleep, State = #state{waiting_data = {sleep, Interval}}) -> |
837 |
1 |
?LOG_STATE(debug, "Finish asleep request and go to asleep", [], State), |
838 |
1 |
{next_state, asleep, State, {state_timeout, Interval, ready_awake}}; |
839 |
|
|
840 |
|
%%------------------------------------------------------------------------------ |
841 |
|
%% @doc Answer for ping request is timeout and retry pingreq |
842 |
|
%% |
843 |
|
%% state : keep [wait_pingreq] |
844 |
|
%% trigger: state timeout + can resend |
845 |
|
%% |
846 |
|
%% state : [wait_pingreq] -> [connected] |
847 |
|
%% trigger: state timeout + cannot resend |
848 |
|
%% |
849 |
|
%% gen_statem for state machine |
850 |
|
%% @end |
851 |
|
%%------------------------------------------------------------------------------ |
852 |
|
|
853 |
|
handle_event(state_timeout, {ResendTimes}, wait_asleep, |
854 |
|
State = #state{config = Config, socket = Socket, waiting_data = {sleep, Interval}}) -> |
855 |
:-( |
#config{max_resend = MaxResend, ack_timeout = AckTimeout} = Config, |
856 |
:-( |
?LOG_STATE(warning, "Answer for asleep request is timeout and retry asleep, retry: ~p/~p", |
857 |
:-( |
[ResendTimes, MaxResend], State), |
858 |
:-( |
if |
859 |
|
ResendTimes < MaxResend -> |
860 |
:-( |
emqttsn_send:send_asleep(Config, Socket, Interval), |
861 |
:-( |
{keep_state, State, {state_timeout, AckTimeout, {ResendTimes + 1}}}; |
862 |
|
ResendTimes >= MaxResend -> |
863 |
:-( |
{next_state, connect_other, State#state{waiting_data = {}}} |
864 |
|
end; |
865 |
|
|
866 |
|
%------------------------------------------------------------------------------- |
867 |
|
% Client is connected and ready for subscribe/publish |
868 |
|
%------------------------------------------------------------------------------- |
869 |
|
|
870 |
|
%%------------------------------------------------------------------------------ |
871 |
|
%% @doc Request Gateway to register and then wait for regack |
872 |
|
%% |
873 |
|
%% state : [connected] -> [wait_reg] |
874 |
|
%% trigger: manual call |
875 |
|
|
876 |
|
%% gen_statem for state machine |
877 |
|
%% @end |
878 |
|
%%------------------------------------------------------------------------------ |
879 |
|
|
880 |
|
handle_event(cast, {reg, TopicName}, connected, |
881 |
|
State = #state{next_packet_id = PacketId, |
882 |
|
socket = Socket, config = Config}) -> |
883 |
7 |
?LOG_STATE(debug, "Request Gateway to register and then wait for regack, topic name: ~p", |
884 |
7 |
[TopicName], State), |
885 |
7 |
#config{ack_timeout = AckTimeout} = Config, |
886 |
7 |
emqttsn_send:send_register(Config, Socket, TopicName, PacketId), |
887 |
7 |
{next_state, wait_reg, |
888 |
|
State#state{waiting_data = {reg, TopicName}}, |
889 |
|
{state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}; |
890 |
|
|
891 |
|
%%------------------------------------------------------------------------------ |
892 |
|
%% @doc Request Gateway to subscribe and then wait for suback |
893 |
|
%% |
894 |
|
%% state : [connected] -> [wait_sub] |
895 |
|
%% trigger: manual call |
896 |
|
|
897 |
|
%% gen_statem for state machine |
898 |
|
%% @end |
899 |
|
%%------------------------------------------------------------------------------ |
900 |
|
|
901 |
|
handle_event(cast, {sub, TopicIdType, TopicIdOrName, MaxQos}, connected, |
902 |
|
State = #state{next_packet_id = PacketId, |
903 |
|
socket = Socket, config = Config}) -> |
904 |
9 |
?LOG_STATE(debug, "Request Gateway to subscribe and then |
905 |
|
wait for suback, type: ~p, topic: ~p", |
906 |
9 |
[TopicIdType, TopicIdOrName], State), |
907 |
9 |
#config{ack_timeout = AckTimeout} = Config, |
908 |
9 |
emqttsn_send:send_subscribe(Config, Socket, false, TopicIdType, TopicIdOrName, MaxQos, PacketId), |
909 |
9 |
{next_state, wait_sub, State#state{waiting_data = {sub, TopicIdType, TopicIdOrName, MaxQos}}, |
910 |
|
{state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}; |
911 |
|
|
912 |
|
%%------------------------------------------------------------------------------ |
913 |
|
%% @doc Request Gateway to unsubscribe and then wait for unsuback |
914 |
|
%% |
915 |
|
%% state : [connected] -> [wait_unsub] |
916 |
|
%% trigger: manual call |
917 |
|
|
918 |
|
%% gen_statem for state machine |
919 |
|
%% @end |
920 |
|
%%------------------------------------------------------------------------------ |
921 |
|
|
922 |
|
handle_event(cast, {unsub, TopicIdType, TopicIdOrName}, connected, |
923 |
|
State = #state{next_packet_id = PacketId, |
924 |
|
socket = Socket, config = Config}) -> |
925 |
2 |
?LOG_STATE(debug, "Request Gateway to subscribe and then |
926 |
|
wait for suback, type: ~p, topic: ~p", |
927 |
2 |
[TopicIdType, TopicIdOrName], State), |
928 |
2 |
#config{ack_timeout = AckTimeout} = Config, |
929 |
2 |
emqttsn_send:send_unsubscribe(Config, Socket, TopicIdType, TopicIdOrName, PacketId), |
930 |
2 |
{next_state, wait_unsub, State#state{waiting_data = {unsub, TopicIdType, TopicIdOrName}}, |
931 |
|
{state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}; |
932 |
|
%%------------------------------------------------------------------------------ |
933 |
|
%% @doc Request Gateway to publish and then wait for |
934 |
|
%% nothing at QoS 0 |
935 |
|
%% puback at QoS 1 |
936 |
|
%% pubrec at QoS 2 |
937 |
|
%% |
938 |
|
%% state : |
939 |
|
%% QoS 0: keep[connected] |
940 |
|
%% QoS 1: [connected] -> [wait_pub_qos1] |
941 |
|
%% QoS 2: [connected] -> [wait_pub_qos2] |
942 |
|
%% trigger: manual call |
943 |
|
|
944 |
|
%% gen_statem for state machine |
945 |
|
%% @end |
946 |
|
%%------------------------------------------------------------------------------ |
947 |
|
|
948 |
|
handle_event(cast, {pub, Retain, TopicIdType, TopicIdOrName, Message}, connected, |
949 |
|
State = #state{next_packet_id = PacketId, socket = Socket, |
950 |
|
topic_name_id = NameMap, config = Config}) -> |
951 |
8 |
?LOG_STATE(debug, "Request Gateway to publish, type: ~p, topic: ~p, return code: ~p", |
952 |
8 |
[TopicIdType, TopicIdOrName, Retain], State), |
953 |
8 |
#config{ack_timeout = AckTimeout, |
954 |
|
max_message_each_topic = TopicMaxMsg, pub_qos = Qos} = Config, |
955 |
8 |
TopicId = get_topic_id(TopicIdType, TopicIdOrName, NameMap), |
956 |
8 |
emqttsn_send:send_publish(Config, Socket, Qos, ?DUP_FALSE, Retain, |
957 |
|
TopicIdType, TopicIdOrName, Message, PacketId), |
958 |
|
|
959 |
8 |
case Qos of |
960 |
|
?QOS_0 -> |
961 |
4 |
NewState = emqttsn_utils:store_msg(State, TopicId, TopicMaxMsg, Message), |
962 |
4 |
{keep_state, NewState}; |
963 |
3 |
?QOS_1 -> {next_state, wait_pub_qos1, |
964 |
|
State#state{waiting_data = {pub, ?QOS_1, TopicIdType, |
965 |
|
TopicIdOrName, Message}}, |
966 |
|
{state_timeout, AckTimeout, {Retain, ?RESEND_TIME_BEG}}}; |
967 |
1 |
?QOS_2 -> {next_state, wait_pub_qos2, |
968 |
|
State#state{waiting_data = {pub, ?QOS_2, TopicIdType, |
969 |
|
TopicIdOrName, Message}}, |
970 |
|
{state_timeout, AckTimeout, {Retain, ?RESEND_TIME_BEG}}} |
971 |
|
end; |
972 |
|
|
973 |
|
%%------------------------------------------------------------------------------ |
974 |
|
%% @doc Receive publish request from other clients and then wait for |
975 |
|
%% nothing at QoS 0/1 |
976 |
|
%% pubrel at QoS 2 |
977 |
|
%% |
978 |
|
%% state : |
979 |
|
%% QoS 0/1: keep [connected] |
980 |
|
%% QoS 2 : [connected] -> [wait_pubrec_qos2] |
981 |
|
%% trigger: receive publish packet |
982 |
|
%% |
983 |
|
%% gen_statem for state machine |
984 |
|
%% @end |
985 |
|
%%------------------------------------------------------------------------------ |
986 |
|
|
987 |
|
handle_event(cast, Packet = ?PUBLISH_PACKET(_RemoteDup, _RemoteQos, |
988 |
|
_RemoteRetain, _TopicIdType, |
989 |
|
_TopicId, _PacketId, _Message), |
990 |
|
connected, State) -> |
991 |
6 |
?LOG_STATE(debug, "Receive publish request from other clients", [], State), |
992 |
6 |
recv_publish(Packet, State, connected); |
993 |
|
|
994 |
|
%%------------------------------------------------------------------------------ |
995 |
|
%% @doc Receive ping request from gateway |
996 |
|
%% |
997 |
|
%% state : keep [connected] |
998 |
|
%% trigger: receive pingreq packet |
999 |
|
%% |
1000 |
|
%% gen_statem for state machine |
1001 |
|
%% @end |
1002 |
|
%%------------------------------------------------------------------------------ |
1003 |
|
|
1004 |
|
handle_event(cast, ?PINGREQ_PACKET(), connected, |
1005 |
|
State = #state{config = Config, socket = Socket}) -> |
1006 |
:-( |
?LOG_STATE(debug, "Receive ping request from gateway", [], State), |
1007 |
:-( |
emqttsn_send:send_pingresp(Config, Socket), |
1008 |
:-( |
{keep_state, State}; |
1009 |
|
|
1010 |
|
%%------------------------------------------------------------------------------ |
1011 |
|
%% @doc Send ping request to gateway |
1012 |
|
%% |
1013 |
|
%% state : [connected] -> [wait_pingreq] |
1014 |
|
%% trigger: state timeout |
1015 |
|
%% |
1016 |
|
%% gen_statem for state machine |
1017 |
|
%% @end |
1018 |
|
%%------------------------------------------------------------------------------ |
1019 |
|
|
1020 |
|
handle_event(state_timeout, ping, connected, |
1021 |
|
State = #state{config = Config, socket = Socket}) -> |
1022 |
:-( |
?LOG_STATE(debug, "Send ping request to gateway", [], State), |
1023 |
:-( |
#config{ack_timeout = AckTimeout} = Config, |
1024 |
:-( |
emqttsn_send:send_pingreq(Config, Socket), |
1025 |
:-( |
{next_state, wait_pingreq, State, {state_timeout, AckTimeout, {?RESEND_TIME_BEG}}}; |
1026 |
|
|
1027 |
|
%%------------------------------------------------------------------------------ |
1028 |
|
%% @doc Notify Gateway to sleep for a duration |
1029 |
|
%% |
1030 |
|
%% state : keep [connected] |
1031 |
|
%% trigger: manual call + zero sleep interval |
1032 |
|
%% |
1033 |
|
%% state : [connected] -> [asleep] |
1034 |
|
%% trigger: manual call + valid sleep interval |
1035 |
|
%% |
1036 |
|
%% gen_statem for state machine |
1037 |
|
%% @end |
1038 |
|
%%------------------------------------------------------------------------------ |
1039 |
|
|
1040 |
|
handle_event(cast, {sleep, Interval}, connected, |
1041 |
|
State = #state{config = Config, socket = Socket}) -> |
1042 |
1 |
?LOG_STATE(debug, "Notify Gateway to sleep for a duration", [], State), |
1043 |
1 |
if |
1044 |
|
Interval =< 0 -> |
1045 |
:-( |
?LOG_STATE(debug, "non-positive Sleep interval ~p leads to no sleeping mode", |
1046 |
:-( |
[Interval], State), |
1047 |
:-( |
{keep_state, State}; |
1048 |
|
Interval > 0 -> |
1049 |
1 |
#config{ack_timeout = AckTimeout} = Config, |
1050 |
1 |
emqttsn_send:send_asleep(Config, Socket, Interval), |
1051 |
1 |
{next_state, wait_asleep, State#state{waiting_data = {sleep, Interval}}, |
1052 |
|
{state_timeout, AckTimeout, {?RESEND_TIME_BEG}}} |
1053 |
|
end; |
1054 |
|
|
1055 |
|
%------------------------------------------------------------------------------- |
1056 |
|
% Reconnect other gateways after failed |
1057 |
|
%------------------------------------------------------------------------------- |
1058 |
|
|
1059 |
|
%%------------------------------------------------------------------------------ |
1060 |
|
%% @doc Connect to other available gateway |
1061 |
|
%% |
1062 |
|
%% state : [connect_other] -> [found] |
1063 |
|
%% trigger: enter state + have available gateway |
1064 |
|
%% |
1065 |
|
%% state : [connect_other] -> [initialized] |
1066 |
|
%% trigger: enter state + have no available gateway |
1067 |
|
%% |
1068 |
|
%% state : [connect_other] -> [initialized] |
1069 |
|
%% trigger: enter state + traverse known gateways exceed max times |
1070 |
|
%% |
1071 |
|
%% gen_statem for state machine |
1072 |
|
%% @end |
1073 |
|
%%------------------------------------------------------------------------------ |
1074 |
|
|
1075 |
|
handle_event(enter, _OldState, connect_other, |
1076 |
|
State = #state{active_gw = #gw_collect{id = FormerId}, |
1077 |
|
config = Config, socket = Socket, |
1078 |
|
gw_failed_cycle = TryTimes, name = Name}) -> |
1079 |
:-( |
?LOG_STATE(debug, "Connect to other available gateway", [], State), |
1080 |
:-( |
#config{reconnect_max_times = MaxTry} = Config, |
1081 |
:-( |
Desperate = TryTimes > MaxTry, |
1082 |
:-( |
AvailableGW = next_gw(Name, FormerId), |
1083 |
:-( |
FirstGW = first_gw(Name), |
1084 |
:-( |
if FirstGW =:= AvailableGW |
1085 |
:-( |
-> NewState = State#state{gw_failed_cycle = TryTimes + 1} |
1086 |
|
end, |
1087 |
:-( |
case AvailableGW of |
1088 |
|
#gw_info{id = GWId, host = Host, port = Port} when Desperate =:= false -> |
1089 |
:-( |
{next_state, found, |
1090 |
|
NewState#state{active_gw = #gw_collect{id = GWId, host = Host, |
1091 |
|
port = Port}}}; |
1092 |
|
_ -> |
1093 |
:-( |
{next_state, initialized, #state{name = Name, socket = Socket, config = Config}} |
1094 |
|
end; |
1095 |
|
|
1096 |
|
%------------------------------------------------------------------------------- |
1097 |
|
% Sleeping feature |
1098 |
|
%------------------------------------------------------------------------------- |
1099 |
|
|
1100 |
|
%%------------------------------------------------------------------------------ |
1101 |
|
%% @doc Send ping request to gateway to awake |
1102 |
|
%% |
1103 |
|
%% state : [asleep] -> [awake] |
1104 |
|
%% trigger: state timeout |
1105 |
|
%% |
1106 |
|
%% gen_statem for state machine |
1107 |
|
%% @end |
1108 |
|
%%------------------------------------------------------------------------------ |
1109 |
|
|
1110 |
|
handle_event(state_timeout, ready_awake, asleep, State = #state{config = Config, socket = Socket}) -> |
1111 |
1 |
?LOG_STATE(debug, "Send ping request to gateway to awake", [], State), |
1112 |
1 |
#config{ack_timeout = AckTimeout, client_id = ClintId} = Config, |
1113 |
1 |
emqttsn_send:send_awake(Config, Socket, ClintId), |
1114 |
1 |
{next_state, awake, State, {state_timeout, AckTimeout, |
1115 |
|
{recv_awake, ?RESEND_TIME_BEG}}}; |
1116 |
|
|
1117 |
|
%%------------------------------------------------------------------------------ |
1118 |
|
%% @doc Receive publish request from other clients and then wait for |
1119 |
|
%% nothing at QoS 0/1 |
1120 |
|
%% pubrel at QoS 2 |
1121 |
|
%% |
1122 |
|
%% state : |
1123 |
|
%% QoS 0/1: keep [awake] |
1124 |
|
%% QoS 2 : [awake] -> [wait_pubrec_qos2] |
1125 |
|
%% trigger: receive publish packet |
1126 |
|
%% |
1127 |
|
%% gen_statem for state machine |
1128 |
|
%% @end |
1129 |
|
%%------------------------------------------------------------------------------ |
1130 |
|
|
1131 |
|
handle_event(cast, Packet = ?PUBLISH_PACKET(_RemoteDup, _RemoteQos, _RemoteRetain, |
1132 |
|
_TopicIdType, _TopicId, _PacketId, |
1133 |
|
_Message), awake, State) -> |
1134 |
:-( |
?LOG_STATE(debug, "Receive publish request from other clients", [], State), |
1135 |
:-( |
recv_publish(Packet, State, awake); |
1136 |
|
|
1137 |
|
%%------------------------------------------------------------------------------ |
1138 |
|
%% @doc Receive pingresp request from gateway and goto asleep |
1139 |
|
%% |
1140 |
|
%% state : [awake] -> [asleep] |
1141 |
|
%% trigger: receive pingresp packet |
1142 |
|
%% |
1143 |
|
%% gen_statem for state machine |
1144 |
|
%% @end |
1145 |
|
%%------------------------------------------------------------------------------ |
1146 |
|
|
1147 |
|
handle_event(cast, ?PINGRESP_PACKET(), awake, State = #state{config = Config}) -> |
1148 |
1 |
?LOG_STATE(debug, "Receive pingresp request from gateway and goto asleep", |
1149 |
1 |
[], State), |
1150 |
1 |
#config{keep_alive = PingInterval} = Config, |
1151 |
1 |
{next_state, asleep, State, {state_timeout, PingInterval, ping}}; |
1152 |
|
|
1153 |
|
%%------------------------------------------------------------------------------ |
1154 |
|
%% @doc Answer for awake request is timeout and retry awake |
1155 |
|
%% |
1156 |
|
%% state : keep [awake] |
1157 |
|
%% trigger: state timeout + can resend |
1158 |
|
%% |
1159 |
|
%% state : [awake] -> [asleep] |
1160 |
|
%% trigger: state timeout + cannot resend |
1161 |
|
%% |
1162 |
|
%% gen_statem for state machine |
1163 |
|
%% @end |
1164 |
|
%%------------------------------------------------------------------------------ |
1165 |
|
|
1166 |
|
handle_event(state_timeout, {recv_awake, ResendTimes}, awake, |
1167 |
|
State = #state{config = Config, socket = Socket}) -> |
1168 |
:-( |
#config{max_resend = MaxResend, client_id = ClientId} = Config, |
1169 |
:-( |
?LOG_STATE(warning, "Answer for awake request is timeout and retry awake, retry: ~p/~p", |
1170 |
:-( |
[ResendTimes, MaxResend], State), |
1171 |
|
|
1172 |
:-( |
if |
1173 |
|
ResendTimes < MaxResend -> |
1174 |
:-( |
emqttsn_send:send_awake(Config, Socket, ClientId), |
1175 |
:-( |
{keep_state, State, {state_timeout, update, {ResendTimes + 1}}}; |
1176 |
|
ResendTimes >= MaxResend -> |
1177 |
:-( |
{next_state, asleep, State} |
1178 |
|
end; |
1179 |
|
|
1180 |
|
%%------------------------------------------------------------------------------ |
1181 |
|
%% @doc Answer for gateway address request from other clients |
1182 |
|
%% |
1183 |
|
%% state : keep Any |
1184 |
|
%% trigger: receive searchgw packet |
1185 |
|
%% |
1186 |
|
%% gen_statem for state machine |
1187 |
|
%% @end |
1188 |
|
%%------------------------------------------------------------------------------ |
1189 |
|
|
1190 |
|
handle_event(cast, {?SEARCHGW_PACKET(Radius), Host, Port}, _StateName, |
1191 |
|
State = #state{name = Name, socket = Socket, config = Config}) -> |
1192 |
:-( |
?LOG_STATE(debug, "Answer for gateway address ~p:~p request from other clients", |
1193 |
:-( |
[Host, Port], State), |
1194 |
:-( |
FirstGW = first_gw(Name), |
1195 |
:-( |
case FirstGW of |
1196 |
|
#gw_info{id = GateWayId, host = GWHost} -> |
1197 |
:-( |
emqttsn_send:send_gwinfo(Config, Socket, Host, Port, Radius, GateWayId, GWHost); |
1198 |
:-( |
none -> none |
1199 |
|
end, |
1200 |
:-( |
{keep_state, State}; |
1201 |
|
|
1202 |
|
%%------------------------------------------------------------------------------ |
1203 |
|
%% @doc Receive register request from other clients |
1204 |
|
%% |
1205 |
|
%% state : keep [connected]/[awake] |
1206 |
|
%% trigger: receive register packet |
1207 |
|
%% |
1208 |
|
%% gen_statem for state machine |
1209 |
|
%% @end |
1210 |
|
%%------------------------------------------------------------------------------ |
1211 |
|
|
1212 |
|
handle_event(cast, ?REGISTER_PACKET(TopicId, PacketId, TopicName), |
1213 |
|
StateName, State = #state{socket = Socket, topic_id_name = IdMap, |
1214 |
|
topic_name_id = NameMap, config = Config}) |
1215 |
|
when StateName =:= connected orelse StateName =:= awake -> |
1216 |
:-( |
?LOG_STATE(debug, "Receive register request for ~p:~p from other clients, packet id: ~p", |
1217 |
:-( |
[TopicId, TopicName, PacketId], State), |
1218 |
:-( |
NewIdMap = dict:store(TopicId, TopicName, IdMap), |
1219 |
:-( |
NewNameMap = dict:store(TopicName, TopicId, NameMap), |
1220 |
:-( |
emqttsn_send:send_regack(Config, Socket, TopicId, ?RC_ACCEPTED, PacketId), |
1221 |
:-( |
{keep_state, State#state{topic_id_name = NewIdMap, topic_name_id = NewNameMap, |
1222 |
|
next_packet_id = next_packet_id(PacketId)}}; |
1223 |
|
|
1224 |
|
%%------------------------------------------------------------------------------ |
1225 |
|
%% @doc Request gateway to disconnect |
1226 |
|
%% |
1227 |
|
%% state : [asleep]/[awake]/[connected] -> [initialized] |
1228 |
|
%% trigger: manual call + at [asleep]/[awake]/[connected] |
1229 |
|
|
1230 |
|
%% gen_statem for state machine |
1231 |
|
%% @end |
1232 |
|
%%------------------------------------------------------------------------------ |
1233 |
|
|
1234 |
|
handle_event(cast, disconnect, StateName, |
1235 |
|
State = #state{name = Name,socket = Socket, config = Config}) |
1236 |
|
when StateName =:= asleep orelse StateName =:= awake orelse |
1237 |
|
StateName =:= connected -> |
1238 |
23 |
?LOG_STATE(debug, "Request gateway to disconnect", [], State), |
1239 |
23 |
emqttsn_send:send_disconnect(Config, Socket), |
1240 |
23 |
{next_state, initialized, #state{name = Name, socket = Socket, config = Config}}; |
1241 |
|
|
1242 |
|
%%------------------------------------------------------------------------------ |
1243 |
|
%% @doc Request gateway to become active |
1244 |
|
%% |
1245 |
|
%% state : [asleep]/[awake] -> [found] |
1246 |
|
%% trigger: manual call |
1247 |
|
|
1248 |
|
%% gen_statem for state machine |
1249 |
|
%% @end |
1250 |
|
%%------------------------------------------------------------------------------ |
1251 |
|
|
1252 |
|
handle_event(cast, connect, StateName, State) |
1253 |
|
when StateName =:= asleep orelse StateName =:= awake -> |
1254 |
:-( |
?LOG_STATE(debug, "Request gateway to become active", [], State), |
1255 |
:-( |
{next_state, found, State}; |
1256 |
|
|
1257 |
|
%------------------------------------------------------------------------------- |
1258 |
|
% Consume message manager and counter |
1259 |
|
%------------------------------------------------------------------------------- |
1260 |
|
|
1261 |
|
%%------------------------------------------------------------------------------ |
1262 |
|
%% @doc Reset message manager and counter |
1263 |
|
%% |
1264 |
|
%% state : keep Any |
1265 |
|
%% trigger: auto called by get_msg |
1266 |
|
|
1267 |
|
%% gen_statem for state machine |
1268 |
|
%% @end |
1269 |
|
%%------------------------------------------------------------------------------ |
1270 |
|
|
1271 |
|
handle_event(cast, {reset_msg, MsgManager, MsgCounter}, _StateName, State) -> |
1272 |
4 |
?LOG_STATE(debug, "Reset message\nManager: ~p\nCounter:~p", |
1273 |
4 |
[MsgManager, MsgCounter], State), |
1274 |
4 |
{keep_state, State#state{msg_manager = MsgManager, msg_counter = MsgCounter}}; |
1275 |
|
|
1276 |
|
%------------------------------------------------------------------------------- |
1277 |
|
% Change config of state machine |
1278 |
|
%------------------------------------------------------------------------------- |
1279 |
|
|
1280 |
|
%%------------------------------------------------------------------------------ |
1281 |
|
%% @doc Change config of client |
1282 |
|
%% |
1283 |
|
%% state : keep Any |
1284 |
|
%% trigger: manual call |
1285 |
|
|
1286 |
|
%% gen_statem for state machine |
1287 |
|
%% @end |
1288 |
|
%%------------------------------------------------------------------------------ |
1289 |
|
|
1290 |
|
handle_event(cast, {config, Config}, _StateName, State) -> |
1291 |
1 |
?LOG_STATE(debug, "Change config: ~p", [Config], State), |
1292 |
1 |
{keep_state, State#state{config = Config}}; |
1293 |
|
|
1294 |
|
%------------------------------------------------------------------------------- |
1295 |
|
% Get State from state machine |
1296 |
|
%------------------------------------------------------------------------------- |
1297 |
|
|
1298 |
|
%%------------------------------------------------------------------------------ |
1299 |
|
%% @doc Get State data of client |
1300 |
|
%% |
1301 |
|
%% state : keep Any |
1302 |
|
%% trigger: manual call |
1303 |
|
|
1304 |
|
%% gen_statem for state machine |
1305 |
|
%% @end |
1306 |
|
%%------------------------------------------------------------------------------ |
1307 |
|
|
1308 |
|
handle_event({call, From}, get_state, _StateName, State) -> |
1309 |
69 |
?LOG_STATE(debug, "Get state", [], State), |
1310 |
69 |
gen_statem:reply(From, State), |
1311 |
69 |
{keep_state, State}; |
1312 |
|
|
1313 |
|
|
1314 |
|
%%------------------------------------------------------------------------------ |
1315 |
|
%% @doc Get State name of client |
1316 |
|
%% |
1317 |
|
%% state : keep Any |
1318 |
|
%% trigger: manual call |
1319 |
|
|
1320 |
|
%% gen_statem for state machine |
1321 |
|
%% @end |
1322 |
|
%%------------------------------------------------------------------------------ |
1323 |
|
|
1324 |
|
handle_event({call, From}, get_state_name, StateName, State) -> |
1325 |
8789 |
?LOG_STATE(debug, "Get state name: ~p", [StateName], State), |
1326 |
8789 |
gen_statem:reply(From, StateName), |
1327 |
8789 |
{keep_state, State}; |
1328 |
|
|
1329 |
|
%------------------------------------------------------------------------------- |
1330 |
|
% Handle disconnect at any state |
1331 |
|
%------------------------------------------------------------------------------- |
1332 |
|
|
1333 |
|
%%------------------------------------------------------------------------------ |
1334 |
|
%% @doc Gateway disconnect and try to reconnect |
1335 |
|
%% |
1336 |
|
%% state : Any -> [found] |
1337 |
|
%% trigger: receive disconnect packet |
1338 |
|
|
1339 |
|
%% gen_statem for state machine |
1340 |
|
%% @end |
1341 |
|
%%------------------------------------------------------------------------------ |
1342 |
|
|
1343 |
|
handle_event(cast, ?DISCONNECT_PACKET(), StateName, State) |
1344 |
|
when StateName =/= wait_asleep -> |
1345 |
11 |
?LOG_STATE(warning, "Connection reset by server", [], State), |
1346 |
11 |
{next_state, found, State}; |
1347 |
|
|
1348 |
|
%------------------------------------------------------------------------------- |
1349 |
|
% Process incoming packet |
1350 |
|
%------------------------------------------------------------------------------- |
1351 |
|
|
1352 |
|
%%------------------------------------------------------------------------------ |
1353 |
|
%% @doc Low-level method for receive packet |
1354 |
|
%% |
1355 |
|
%% event : {recv, {Host, Port, Bin}} -> {Packet, Host, Port} |
1356 |
|
%% trigger: receive ADVERTISE_PACKET/GWINFO_PACKET packet |
1357 |
|
%% |
1358 |
|
%% event : {recv, {Host, Port, Bin}} -> {Packet, Host, Port}/Packet |
1359 |
|
%% trigger: receive other packet |
1360 |
|
%% |
1361 |
|
%% gen_statem for state machine |
1362 |
|
%% @end |
1363 |
|
%%------------------------------------------------------------------------------ |
1364 |
|
|
1365 |
|
handle_event(info, {udp, Socket, Host, Port, Bin}, _StateName, State = #state{socket = SelfSocket}) |
1366 |
|
when Socket =:= SelfSocket -> |
1367 |
62 |
?LOG_STATE(debug, "recv packet {~p} from host ~p:~p", |
1368 |
62 |
[Bin, Host, Port], State), |
1369 |
62 |
process_incoming({Host, Port, Bin}, State); |
1370 |
|
|
1371 |
|
handle_event(info,{udp_error, _Socket, Reason}, _StateName, State) -> |
1372 |
64 |
?LOG_STATE(debug, "recv failed for reason: ~p", |
1373 |
64 |
[Reason], State), |
1374 |
64 |
{keep_state, State}; |
1375 |
|
|
1376 |
|
handle_event(enter, Args, StateName, State) -> |
1377 |
74 |
?LOG_STATE(debug, "useless enter with ~p at ~p", |
1378 |
74 |
[Args, StateName], State), |
1379 |
74 |
{keep_state, State}; |
1380 |
|
|
1381 |
|
handle_event(Operation, Args, StateName, State) -> |
1382 |
1 |
?LOG_STATE(warning, "unsupported operation ~p with ~p at ~p", |
1383 |
:-( |
[Operation, Args, StateName], State), |
1384 |
1 |
{keep_state, State}. |
1385 |
|
%%------------------------------------------------------------------------------ |
1386 |
|
%% @doc Judge whether to reserve the source of sender |
1387 |
|
%% @end |
1388 |
|
%%------------------------------------------------------------------------------ |
1389 |
|
|
1390 |
|
-spec filter_packet_elsewhere(mqttsn_packet(), host(), inet:port_number()) -> |
1391 |
|
{mqttsn_packet(), host(), inet:port_number()} | mqttsn_packet(). |
1392 |
|
filter_packet_elsewhere(Packet, Host, Port) -> |
1393 |
62 |
case Packet of |
1394 |
|
?ADVERTISE_PACKET(_GateWayId, _Duration) -> |
1395 |
1 |
{Packet, Host, Port}; |
1396 |
|
?GWINFO_PACKET(_GateWayId) -> |
1397 |
2 |
{Packet, Host, Port}; |
1398 |
59 |
_ -> Packet |
1399 |
|
end. |
1400 |
|
|
1401 |
|
%%------------------------------------------------------------------------------ |
1402 |
|
%% @doc Parse incoming binary data into packet |
1403 |
|
%% @end |
1404 |
|
%%------------------------------------------------------------------------------ |
1405 |
|
|
1406 |
|
-spec process_incoming({host(), inet:port_number(), bitstring()}, state()) |
1407 |
|
-> gen_statem:event_handler_result(state()). |
1408 |
|
process_incoming({Host, Port, Bin}, |
1409 |
|
State = #state{active_gw = #gw_collect{host = ServerHost, |
1410 |
|
port = ServerPort}}) -> |
1411 |
62 |
case emqttsn_frame:parse(Bin) of |
1412 |
|
{ok, Packet} -> |
1413 |
62 |
Ret = filter_packet_elsewhere(Packet, Host, Port), |
1414 |
62 |
{keep_state, State, {next_event, cast, Ret}}; |
1415 |
|
_ -> |
1416 |
:-( |
?LOG(warning, "drop packet from other than gateway", |
1417 |
|
#{server_host => ServerHost, server_port => ServerPort, |
1418 |
:-( |
actual_host => Host, actual_port => Port}), |
1419 |
:-( |
{keep_state, State} |
1420 |
|
end. |
1421 |
|
|
1422 |
|
%------------------------------------------------------------------------------- |
1423 |
|
% Reused recv methods |
1424 |
|
%------------------------------------------------------------------------------- |
1425 |
|
|
1426 |
|
%%------------------------------------------------------------------------------ |
1427 |
|
%% @doc Shared processing method for publish packet |
1428 |
|
%% @end |
1429 |
|
%%------------------------------------------------------------------------------ |
1430 |
|
|
1431 |
|
-spec recv_publish(mqttsn_packet(), state(), connected | awake) -> gen_statem:event_handler_result(state()). |
1432 |
|
recv_publish(?PUBLISH_PACKET(_RemoteDup, _RemoteQos, _RemoteRetain, TopicIdType, TopicIdOrName, PacketId, Message), |
1433 |
|
State = #state{topic_id_use_qos = QosMap, config = Config, socket = Socket, topic_name_id = NameMap}, FromStateName) -> |
1434 |
|
|
1435 |
6 |
#config{ack_timeout = AckTimeout, max_message_each_topic = TopicMaxMsg} = Config, |
1436 |
6 |
TopicId = get_topic_id(TopicIdType, TopicIdOrName, NameMap), |
1437 |
6 |
Qos = dict:fetch(TopicId, QosMap), |
1438 |
6 |
NewState = emqttsn_utils:store_msg(State, TopicId, TopicMaxMsg, Message), |
1439 |
6 |
case Qos of |
1440 |
|
?QOS_0 -> |
1441 |
6 |
{keep_state, NewState#state{next_packet_id = next_packet_id(PacketId)}}; |
1442 |
|
?QOS_1 -> |
1443 |
:-( |
emqttsn_send:send_puback(Config, Socket, TopicId, ?RC_ACCEPTED, PacketId), |
1444 |
:-( |
{keep_state, NewState#state{next_packet_id = PacketId}}; |
1445 |
|
?QOS_2 -> |
1446 |
:-( |
emqttsn_send:send_pubrec(Config, Socket, PacketId), |
1447 |
:-( |
{next_state, wait_pubrec_qos2, |
1448 |
|
NewState#state{next_packet_id = PacketId, waiting_data = {FromStateName}}, |
1449 |
|
{state_timeout, AckTimeout, {?RESEND_TIME_BEG}}} |
1450 |
|
end. |
1451 |
|
|
1452 |
|
%%-------------------------------------------------------------------- |
1453 |
|
%% packet_id generator for state machine |
1454 |
|
%%-------------------------------------------------------------------- |
1455 |
|
|
1456 |
|
-spec next_packet_id(packet_id()) -> packet_id(). |
1457 |
|
next_packet_id(?MAX_PACKET_ID) -> |
1458 |
:-( |
1; |
1459 |
|
next_packet_id(Id) -> |
1460 |
25 |
Id + 1. |
1461 |
|
|
1462 |
|
-spec get_topic_id(topic_id_type(), |
1463 |
|
topic_id() | string(), |
1464 |
|
dict:dict(string(), topic_id())) -> |
1465 |
|
topic_id(). |
1466 |
|
get_topic_id(TopicIdType, TopicIdOrName, NameMap) -> |
1467 |
15 |
case TopicIdType of |
1468 |
|
?PRE_DEF_TOPIC_ID -> |
1469 |
3 |
TopicIdOrName; |
1470 |
|
?TOPIC_ID -> |
1471 |
:-( |
TopicIdOrName; |
1472 |
|
?SHORT_TOPIC_NAME -> |
1473 |
12 |
dict:fetch(TopicIdOrName, NameMap) |
1474 |
|
end. |
1475 |
|
|
1476 |
|
-spec first_gw(string()) -> #gw_info{} | none. |
1477 |
|
first_gw(Name) -> |
1478 |
:-( |
NameGW = list_to_atom(Name), |
1479 |
:-( |
NextKey = ets:first(NameGW), |
1480 |
:-( |
case NextKey of |
1481 |
|
'$end_of_table' -> |
1482 |
:-( |
none; |
1483 |
|
Key -> |
1484 |
:-( |
{_, GWInfo} = ets:lookup_element(NameGW, Key, 1), |
1485 |
:-( |
GWInfo |
1486 |
|
end. |
1487 |
|
|
1488 |
|
-spec next_gw(string(), gw_id()) -> #gw_info{} | none. |
1489 |
|
next_gw(Name, GWId) -> |
1490 |
:-( |
NameGW = list_to_atom(Name), |
1491 |
:-( |
NextKey = ets:next(NameGW, GWId), |
1492 |
:-( |
FirstKey = ets:first(NameGW), |
1493 |
:-( |
case {NextKey, FirstKey} of |
1494 |
|
{'$end_of_table', '$end_of_table'} -> |
1495 |
:-( |
none; |
1496 |
|
{'$end_of_table', Key} -> |
1497 |
:-( |
{_, GWInfo} = ets:lookup_element(NameGW, Key, 1), |
1498 |
:-( |
GWInfo; |
1499 |
|
{Key, _} -> |
1500 |
:-( |
{_, GWInfo} = ets:lookup_element(NameGW, Key, 1), |
1501 |
:-( |
GWInfo |
1502 |
|
end. |