1 |
|
%%------------------------------------------------------------------------- |
2 |
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%------------------------------------------------------------------------- |
16 |
|
|
17 |
|
-module(emqttsn). |
18 |
|
|
19 |
|
%% @headerfile "emqttsn.hrl" |
20 |
|
|
21 |
|
-include("emqttsn.hrl"). |
22 |
|
-include("logger.hrl"). |
23 |
|
|
24 |
|
-include_lib("stdlib/include/assert.hrl"). |
25 |
|
|
26 |
|
-export([start_link/1, start_link/2, register/3, subscribe/5, unsubscribe/4, publish/6, |
27 |
|
add_host/4, connect/3, sleep/3, get_state/1, get_state_name/1, reset_config/2, stop/1, |
28 |
|
finalize/1, disconnect/1, wait_until_state_name/2]). |
29 |
|
|
30 |
|
-export_type([client/0]). |
31 |
|
|
32 |
|
-spec merge_opt(#config{}, [option()]) -> #config{}. |
33 |
|
merge_opt(Config, [{strict_mode, Value} | Options]) -> |
34 |
1 |
merge_opt(Config#config{strict_mode = Value}, Options); |
35 |
|
merge_opt(Config, [{clean_session, Value} | Options]) -> |
36 |
1 |
merge_opt(Config#config{clean_session = Value}, Options); |
37 |
|
merge_opt(Config, [{max_size, Value} | Options]) -> |
38 |
1 |
merge_opt(Config#config{max_size = Value}, Options); |
39 |
|
merge_opt(Config, [{ack_timeout, Value} | Options]) -> |
40 |
6 |
merge_opt(Config#config{ack_timeout = Value}, Options); |
41 |
|
merge_opt(Config, [{keep_alive, Value} | Options]) -> |
42 |
2 |
merge_opt(Config#config{keep_alive = Value}, Options); |
43 |
|
merge_opt(Config, [{resend_no_qos, Value} | Options]) -> |
44 |
1 |
merge_opt(Config#config{resend_no_qos = Value}, Options); |
45 |
|
merge_opt(Config, [{max_resend, Value} | Options]) -> |
46 |
6 |
merge_opt(Config#config{max_resend = Value}, Options); |
47 |
|
merge_opt(Config, [{search_gw_interval, Value} | Options]) -> |
48 |
1 |
merge_opt(Config#config{search_gw_interval = Value}, Options); |
49 |
|
merge_opt(Config, [{reconnect_max_times, Value} | Options]) -> |
50 |
1 |
merge_opt(Config#config{reconnect_max_times = Value}, Options); |
51 |
|
merge_opt(Config, [{max_message_each_topic, Value} | Options]) -> |
52 |
1 |
merge_opt(Config#config{max_message_each_topic = Value}, Options); |
53 |
|
merge_opt(Config, [{msg_handler, Value} | Options]) -> |
54 |
7 |
merge_opt(Config#config{msg_handler = Value}, Options); |
55 |
|
merge_opt(Config, [{send_port, Value} | Options]) -> |
56 |
1 |
merge_opt(Config#config{send_port = Value}, Options); |
57 |
|
merge_opt(Config, [{proto_ver, Value} | Options]) -> |
58 |
1 |
merge_opt(Config#config{proto_ver = Value}, Options); |
59 |
|
merge_opt(Config, [{proto_name, Value} | Options]) -> |
60 |
1 |
merge_opt(Config#config{proto_name = Value}, Options); |
61 |
|
merge_opt(Config, [{radius, Value} | Options]) -> |
62 |
1 |
merge_opt(Config#config{radius = Value}, Options); |
63 |
|
merge_opt(Config, [{duration, Value} | Options]) -> |
64 |
1 |
merge_opt(Config#config{duration = Value}, Options); |
65 |
|
merge_opt(Config, [{recv_qos, Value} | Options]) -> |
66 |
1 |
merge_opt(Config#config{recv_qos = Value}, Options); |
67 |
|
merge_opt(Config, [{pub_qos, Value} | Options]) -> |
68 |
5 |
merge_opt(Config#config{pub_qos = Value}, Options); |
69 |
|
merge_opt(Config, [{will_qos, Value} | Options]) -> |
70 |
3 |
merge_opt(Config#config{will_qos = Value}, Options); |
71 |
|
merge_opt(Config, [{will, Value} | Options]) -> |
72 |
4 |
merge_opt(Config#config{will = Value}, Options); |
73 |
|
merge_opt(Config, [{will_topic, Value} | Options]) -> |
74 |
2 |
merge_opt(Config#config{will_topic = Value}, Options); |
75 |
|
merge_opt(Config, [{will_msg, Value} | Options]) -> |
76 |
2 |
merge_opt(Config#config{will_msg = Value}, Options); |
77 |
|
merge_opt(Config, [{_AnyKey, _AnyValue} | Options]) -> |
78 |
20 |
merge_opt(Config, Options); |
79 |
|
merge_opt(Config, []) -> |
80 |
36 |
Config. |
81 |
|
|
82 |
|
%% @doc Create a MQTT-SN client by default options |
83 |
|
%% |
84 |
|
%% @param Name unique name of client, used also as client id |
85 |
|
%% |
86 |
|
%% @equiv start_link(Name, []) |
87 |
|
%% @returns A client object |
88 |
|
%% @end |
89 |
|
-spec start_link(string()) -> {ok, client(), config()} | {error, term()}. |
90 |
|
start_link(Name) -> |
91 |
11 |
start_link(Name, []). |
92 |
|
|
93 |
|
%% @doc Create a MQTT-SN client |
94 |
|
%% |
95 |
|
%% @param Name unique name of client, used also as client id |
96 |
|
%% @param Options array of client construction parameters |
97 |
|
%% |
98 |
|
%% @returns A client object |
99 |
|
%% @end |
100 |
|
-spec start_link(string(), [option()]) -> {ok, client(), config()} | {error, term()}. |
101 |
|
start_link(Name, Options) -> |
102 |
36 |
NameLength = string:length(Name), |
103 |
36 |
?assert(NameLength >= 1 andalso NameLength =< 23), |
104 |
36 |
Config = merge_opt(#config{client_id = Name}, Options), |
105 |
|
|
106 |
36 |
case emqttsn_state:start_link(Name, Config) of |
107 |
|
{error, Reason} -> |
108 |
1 |
?LOGP(error, "gen_statem init failed, reason: ~p", [Reason]), |
109 |
1 |
{error, Reason}; |
110 |
|
{ok, StateM} -> |
111 |
35 |
{ok, StateM, Config} |
112 |
|
end. |
113 |
|
|
114 |
|
%% @doc Block until client reach target state |
115 |
|
%% |
116 |
|
%% @param Client the client object |
117 |
|
%% @param StateNames target state name to be wait for |
118 |
|
%% |
119 |
|
%% @end |
120 |
|
-spec wait_until_state_name(client(), [atom()]) -> ok. |
121 |
|
wait_until_state_name(Client, StateNames) -> |
122 |
:-( |
wait_until_state_name(Client, StateNames, true). |
123 |
|
|
124 |
|
-spec wait_until_state_name(client(), [atom()], boolean()) -> ok. |
125 |
|
wait_until_state_name(Client, StateNames, Block) -> |
126 |
8759 |
case {Block, get_state_name(Client)} of |
127 |
|
{false, _} -> |
128 |
10 |
ok; |
129 |
|
{true, S} -> |
130 |
8749 |
case lists:member(S, StateNames) of |
131 |
|
true -> |
132 |
36 |
ok; |
133 |
|
false -> |
134 |
8713 |
wait_until_state_name(Client, StateNames, true) |
135 |
|
end |
136 |
|
end. |
137 |
|
|
138 |
|
%% @doc Cast a MQTT-SN register request |
139 |
|
%% |
140 |
|
%% @param Client the client object |
141 |
|
%% @param TopicName topic name to be registered |
142 |
|
%% @param Block whether make a block/unblock request(wait until ack response or timeout) |
143 |
|
%% |
144 |
|
%% @end |
145 |
|
-spec register(client(), string(), boolean()) -> ok. |
146 |
|
register(Client, TopicName, Block) -> |
147 |
7 |
gen_statem:cast(Client, {reg, TopicName}), |
148 |
7 |
wait_until_state_name(Client, [connected], Block). |
149 |
|
|
150 |
|
%% @doc Cast a MQTT-SN subscribe request |
151 |
|
%% |
152 |
|
%% @param Client the client object |
153 |
|
%% @param TopicIdType data type of TopicIdOrName param |
154 |
|
%% @param TopicIdOrName topic id or name to be sent(decided by TopicIdType) |
155 |
|
%% @param MaxQos max Qos level can be handled of subscribed request |
156 |
|
%% @param Block whether make a block/unblock request(wait until ack response or timeout) |
157 |
|
%% |
158 |
|
%% @end |
159 |
|
-spec subscribe(client(), topic_id_type(), topic_id_or_name(), qos(), boolean()) -> ok. |
160 |
|
subscribe(Client, TopicIdType, TopicIdOrName, MaxQos, Block) -> |
161 |
9 |
gen_statem:cast(Client, {sub, TopicIdType, TopicIdOrName, MaxQos}), |
162 |
9 |
wait_until_state_name(Client, [connected], Block). |
163 |
|
|
164 |
|
%% @doc Cast a MQTT-SN unsubscribe request |
165 |
|
%% |
166 |
|
%% @param TopicIdType data type of TopicIdOrName param |
167 |
|
%% @param TopicIdOrName topic id or name to be sent(decided by TopicIdType) |
168 |
|
%% @param Block whether make a block/unblock request(wait until ack response or timeout) |
169 |
|
%% |
170 |
|
%% @end |
171 |
|
-spec unsubscribe(client(), topic_id_type(), topic_id_or_name(), boolean()) -> ok. |
172 |
|
unsubscribe(Client, TopicIdType, TopicIdOrName, Block) -> |
173 |
2 |
gen_statem:cast(Client, {unsub, TopicIdType, TopicIdOrName}), |
174 |
2 |
wait_until_state_name(Client, [connected], Block). |
175 |
|
|
176 |
|
%% @doc Cast a MQTT-SN publish request |
177 |
|
%% |
178 |
|
%% @param Client the client object |
179 |
|
%% @param Retain whether the message is retain |
180 |
|
%% @param TopicIdType data type of TopicIdOrName param |
181 |
|
%% @param TopicIdOrName topic id or name to be sent(decided by TopicIdType) |
182 |
|
%% @param Message message data of publish request |
183 |
|
%% @param Block whether make a block/unblock request(wait until ack response or timeout) |
184 |
|
%% |
185 |
|
%% @end |
186 |
|
-spec publish(client(), |
187 |
|
boolean(), |
188 |
|
topic_id_type(), |
189 |
|
topic_id_or_name(), |
190 |
|
string(), |
191 |
|
boolean()) -> |
192 |
|
ok. |
193 |
|
publish(Client, Retain, TopicIdType, TopicIdOrName, Message, Block) -> |
194 |
8 |
gen_statem:cast(Client, {pub, Retain, TopicIdType, TopicIdOrName, Message}), |
195 |
8 |
wait_until_state_name(Client, [connected], Block). |
196 |
|
|
197 |
|
%% @doc Manually add a gateway host(non-blocking) |
198 |
|
%% |
199 |
|
%% @param Client the client object |
200 |
|
%% @param Host host of new gateway |
201 |
|
%% @param Port port of new gateway |
202 |
|
%% @param GatewayId gateway id of new gateway(save locally) |
203 |
|
%% |
204 |
|
%% @end |
205 |
|
-spec add_host(client(), host(), port(), gw_id()) -> ok. |
206 |
|
add_host(Client, Host, Port, GateWayId) -> |
207 |
19 |
gen_statem:cast(Client, {add_gw, Host, Port, GateWayId}). |
208 |
|
|
209 |
|
%% @doc Cast a MQTT-SN connect request |
210 |
|
%% |
211 |
|
%% @param Client the client object |
212 |
|
%% @param GatewayId gateway id to be connected |
213 |
|
%% @param Block whether make a block/unblock request(wait until ack response or timeout) |
214 |
|
%% |
215 |
|
%% |
216 |
|
-spec connect(client(), gw_id(), boolean()) -> ok. |
217 |
|
connect(Client, GateWayId, Block) -> |
218 |
19 |
gen_statem:cast(Client, {connect, GateWayId}), |
219 |
19 |
wait_until_state_name(Client, [connected], Block). |
220 |
|
|
221 |
|
%% @doc Cast a MQTT-SN sleep request |
222 |
|
%% |
223 |
|
%% @param Client the client object |
224 |
|
%% @param Interval sleep interval(ms) |
225 |
|
%% |
226 |
|
%% |
227 |
|
%% @end |
228 |
|
-spec sleep(client(), pos_integer(), boolean()) -> ok. |
229 |
|
sleep(Client, Interval, Block) -> |
230 |
1 |
gen_statem:cast(Client, {sleep, Interval}), |
231 |
1 |
wait_until_state_name(Client, [asleep], Block). |
232 |
|
|
233 |
|
%% @doc Cast a MQTT-SN disconnect request(non-blocking) |
234 |
|
%% |
235 |
|
%% @param Client the client object |
236 |
|
%% |
237 |
|
%% @end |
238 |
|
-spec disconnect(client()) -> ok. |
239 |
|
disconnect(Client) -> |
240 |
23 |
gen_statem:cast(Client, disconnect). |
241 |
|
|
242 |
|
%% @doc Get state data of the client(force-blocking) |
243 |
|
%% |
244 |
|
%% @param Client the client object |
245 |
|
%% |
246 |
|
%% @returns state data |
247 |
|
%% @end |
248 |
|
-spec get_state(client()) -> state(). |
249 |
|
get_state(Client) -> |
250 |
41 |
State = gen_statem:call(Client, get_state), |
251 |
41 |
State. |
252 |
|
|
253 |
|
%% @doc Get state name of the client(force-blocking) |
254 |
|
%% |
255 |
|
%% @param Client the client object |
256 |
|
%% |
257 |
|
%% |
258 |
|
%% @end |
259 |
|
-spec get_state_name(client()) -> atom(). |
260 |
|
get_state_name(Client) -> |
261 |
8789 |
StateName = gen_statem:call(Client, get_state_name), |
262 |
8789 |
StateName. |
263 |
|
|
264 |
|
%% @doc Set new config of the client(non-blocking) |
265 |
|
%% |
266 |
|
%% @param Client the client object |
267 |
|
%% @param Config new config to be set |
268 |
|
%% |
269 |
|
%% @returns state name |
270 |
|
%% @end |
271 |
|
-spec reset_config(client(), #config{}) -> ok. |
272 |
|
reset_config(Client, Config) -> |
273 |
1 |
gen_statem:cast(Client, {config, Config}). |
274 |
|
|
275 |
|
%% @doc Only Stop the state machine client, but not disconnect(non-blocking) |
276 |
|
%% |
277 |
|
%% @param Client the client object |
278 |
|
%% @param Config new config to be set |
279 |
|
%% |
280 |
|
%% @returns socket for low-level API, can be closed if not use |
281 |
|
%% @end |
282 |
|
-spec stop(client()) -> {ok, inet:socket()}. |
283 |
|
stop(Client) -> |
284 |
31 |
#state{socket = Socket} = get_state(Client), |
285 |
31 |
gen_statem:stop(Client), |
286 |
31 |
{ok, Socket}. |
287 |
|
|
288 |
|
%% @doc Stop and disconnect the client(non-blocking) |
289 |
|
%% |
290 |
|
%% @param Client the client object |
291 |
|
%% @param Config new config to be set |
292 |
|
%% |
293 |
|
%% @returns socket for low-level API, can be closed if not use |
294 |
|
%% @end |
295 |
|
-spec finalize(client()) -> ok. |
296 |
|
finalize(Client) -> |
297 |
30 |
StateName = get_state_name(Client), |
298 |
30 |
case StateName =/= initialized andalso StateName =/= found of |
299 |
|
true -> |
300 |
23 |
disconnect(Client), |
301 |
23 |
stop(Client); |
302 |
|
false -> |
303 |
7 |
stop(Client) |
304 |
|
end, |
305 |
30 |
ok. |