/__w/emqttsn/emqttsn/_build/test/cover/ct/emqttsn.html

1 %%-------------------------------------------------------------------------
2 %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%-------------------------------------------------------------------------
16
17 -module(emqttsn).
18
19 %% @headerfile "emqttsn.hrl"
20
21 -include("emqttsn.hrl").
22 -include("logger.hrl").
23
24 -include_lib("stdlib/include/assert.hrl").
25
26 -export([start_link/1, start_link/2, register/3, subscribe/5, unsubscribe/4, publish/6,
27 add_host/4, connect/3, sleep/3, get_state/1, get_state_name/1, reset_config/2, stop/1,
28 finalize/1, disconnect/1, wait_until_state_name/2]).
29
30 -export_type([client/0]).
31
32 -spec merge_opt(#config{}, [option()]) -> #config{}.
33 merge_opt(Config, [{strict_mode, Value} | Options]) ->
34 1 merge_opt(Config#config{strict_mode = Value}, Options);
35 merge_opt(Config, [{clean_session, Value} | Options]) ->
36 1 merge_opt(Config#config{clean_session = Value}, Options);
37 merge_opt(Config, [{max_size, Value} | Options]) ->
38 1 merge_opt(Config#config{max_size = Value}, Options);
39 merge_opt(Config, [{ack_timeout, Value} | Options]) ->
40 6 merge_opt(Config#config{ack_timeout = Value}, Options);
41 merge_opt(Config, [{keep_alive, Value} | Options]) ->
42 2 merge_opt(Config#config{keep_alive = Value}, Options);
43 merge_opt(Config, [{resend_no_qos, Value} | Options]) ->
44 1 merge_opt(Config#config{resend_no_qos = Value}, Options);
45 merge_opt(Config, [{max_resend, Value} | Options]) ->
46 6 merge_opt(Config#config{max_resend = Value}, Options);
47 merge_opt(Config, [{search_gw_interval, Value} | Options]) ->
48 1 merge_opt(Config#config{search_gw_interval = Value}, Options);
49 merge_opt(Config, [{reconnect_max_times, Value} | Options]) ->
50 1 merge_opt(Config#config{reconnect_max_times = Value}, Options);
51 merge_opt(Config, [{max_message_each_topic, Value} | Options]) ->
52 1 merge_opt(Config#config{max_message_each_topic = Value}, Options);
53 merge_opt(Config, [{msg_handler, Value} | Options]) ->
54 7 merge_opt(Config#config{msg_handler = Value}, Options);
55 merge_opt(Config, [{send_port, Value} | Options]) ->
56 1 merge_opt(Config#config{send_port = Value}, Options);
57 merge_opt(Config, [{proto_ver, Value} | Options]) ->
58 1 merge_opt(Config#config{proto_ver = Value}, Options);
59 merge_opt(Config, [{proto_name, Value} | Options]) ->
60 1 merge_opt(Config#config{proto_name = Value}, Options);
61 merge_opt(Config, [{radius, Value} | Options]) ->
62 1 merge_opt(Config#config{radius = Value}, Options);
63 merge_opt(Config, [{duration, Value} | Options]) ->
64 1 merge_opt(Config#config{duration = Value}, Options);
65 merge_opt(Config, [{recv_qos, Value} | Options]) ->
66 1 merge_opt(Config#config{recv_qos = Value}, Options);
67 merge_opt(Config, [{pub_qos, Value} | Options]) ->
68 5 merge_opt(Config#config{pub_qos = Value}, Options);
69 merge_opt(Config, [{will_qos, Value} | Options]) ->
70 3 merge_opt(Config#config{will_qos = Value}, Options);
71 merge_opt(Config, [{will, Value} | Options]) ->
72 4 merge_opt(Config#config{will = Value}, Options);
73 merge_opt(Config, [{will_topic, Value} | Options]) ->
74 2 merge_opt(Config#config{will_topic = Value}, Options);
75 merge_opt(Config, [{will_msg, Value} | Options]) ->
76 2 merge_opt(Config#config{will_msg = Value}, Options);
77 merge_opt(Config, [{_AnyKey, _AnyValue} | Options]) ->
78 20 merge_opt(Config, Options);
79 merge_opt(Config, []) ->
80 36 Config.
81
82 %% @doc Create a MQTT-SN client by default options
83 %%
84 %% @param Name unique name of client, used also as client id
85 %%
86 %% @equiv start_link(Name, [])
87 %% @returns A client object
88 %% @end
89 -spec start_link(string()) -> {ok, client(), config()} | {error, term()}.
90 start_link(Name) ->
91 11 start_link(Name, []).
92
93 %% @doc Create a MQTT-SN client
94 %%
95 %% @param Name unique name of client, used also as client id
96 %% @param Options array of client construction parameters
97 %%
98 %% @returns A client object
99 %% @end
100 -spec start_link(string(), [option()]) -> {ok, client(), config()} | {error, term()}.
101 start_link(Name, Options) ->
102 36 NameLength = string:length(Name),
103 36 ?assert(NameLength >= 1 andalso NameLength =< 23),
104 36 Config = merge_opt(#config{client_id = Name}, Options),
105
106 36 case emqttsn_state:start_link(Name, Config) of
107 {error, Reason} ->
108 1 ?LOGP(error, "gen_statem init failed, reason: ~p", [Reason]),
109 1 {error, Reason};
110 {ok, StateM} ->
111 35 {ok, StateM, Config}
112 end.
113
114 %% @doc Block until client reach target state
115 %%
116 %% @param Client the client object
117 %% @param StateNames target state name to be wait for
118 %%
119 %% @end
120 -spec wait_until_state_name(client(), [atom()]) -> ok.
121 wait_until_state_name(Client, StateNames) ->
122
:-(
wait_until_state_name(Client, StateNames, true).
123
124 -spec wait_until_state_name(client(), [atom()], boolean()) -> ok.
125 wait_until_state_name(Client, StateNames, Block) ->
126 8759 case {Block, get_state_name(Client)} of
127 {false, _} ->
128 10 ok;
129 {true, S} ->
130 8749 case lists:member(S, StateNames) of
131 true ->
132 36 ok;
133 false ->
134 8713 wait_until_state_name(Client, StateNames, true)
135 end
136 end.
137
138 %% @doc Cast a MQTT-SN register request
139 %%
140 %% @param Client the client object
141 %% @param TopicName topic name to be registered
142 %% @param Block whether make a block/unblock request(wait until ack response or timeout)
143 %%
144 %% @end
145 -spec register(client(), string(), boolean()) -> ok.
146 register(Client, TopicName, Block) ->
147 7 gen_statem:cast(Client, {reg, TopicName}),
148 7 wait_until_state_name(Client, [connected], Block).
149
150 %% @doc Cast a MQTT-SN subscribe request
151 %%
152 %% @param Client the client object
153 %% @param TopicIdType data type of TopicIdOrName param
154 %% @param TopicIdOrName topic id or name to be sent(decided by TopicIdType)
155 %% @param MaxQos max Qos level can be handled of subscribed request
156 %% @param Block whether make a block/unblock request(wait until ack response or timeout)
157 %%
158 %% @end
159 -spec subscribe(client(), topic_id_type(), topic_id_or_name(), qos(), boolean()) -> ok.
160 subscribe(Client, TopicIdType, TopicIdOrName, MaxQos, Block) ->
161 9 gen_statem:cast(Client, {sub, TopicIdType, TopicIdOrName, MaxQos}),
162 9 wait_until_state_name(Client, [connected], Block).
163
164 %% @doc Cast a MQTT-SN unsubscribe request
165 %%
166 %% @param TopicIdType data type of TopicIdOrName param
167 %% @param TopicIdOrName topic id or name to be sent(decided by TopicIdType)
168 %% @param Block whether make a block/unblock request(wait until ack response or timeout)
169 %%
170 %% @end
171 -spec unsubscribe(client(), topic_id_type(), topic_id_or_name(), boolean()) -> ok.
172 unsubscribe(Client, TopicIdType, TopicIdOrName, Block) ->
173 2 gen_statem:cast(Client, {unsub, TopicIdType, TopicIdOrName}),
174 2 wait_until_state_name(Client, [connected], Block).
175
176 %% @doc Cast a MQTT-SN publish request
177 %%
178 %% @param Client the client object
179 %% @param Retain whether the message is retain
180 %% @param TopicIdType data type of TopicIdOrName param
181 %% @param TopicIdOrName topic id or name to be sent(decided by TopicIdType)
182 %% @param Message message data of publish request
183 %% @param Block whether make a block/unblock request(wait until ack response or timeout)
184 %%
185 %% @end
186 -spec publish(client(),
187 boolean(),
188 topic_id_type(),
189 topic_id_or_name(),
190 string(),
191 boolean()) ->
192 ok.
193 publish(Client, Retain, TopicIdType, TopicIdOrName, Message, Block) ->
194 8 gen_statem:cast(Client, {pub, Retain, TopicIdType, TopicIdOrName, Message}),
195 8 wait_until_state_name(Client, [connected], Block).
196
197 %% @doc Manually add a gateway host(non-blocking)
198 %%
199 %% @param Client the client object
200 %% @param Host host of new gateway
201 %% @param Port port of new gateway
202 %% @param GatewayId gateway id of new gateway(save locally)
203 %%
204 %% @end
205 -spec add_host(client(), host(), port(), gw_id()) -> ok.
206 add_host(Client, Host, Port, GateWayId) ->
207 19 gen_statem:cast(Client, {add_gw, Host, Port, GateWayId}).
208
209 %% @doc Cast a MQTT-SN connect request
210 %%
211 %% @param Client the client object
212 %% @param GatewayId gateway id to be connected
213 %% @param Block whether make a block/unblock request(wait until ack response or timeout)
214 %%
215 %%
216 -spec connect(client(), gw_id(), boolean()) -> ok.
217 connect(Client, GateWayId, Block) ->
218 19 gen_statem:cast(Client, {connect, GateWayId}),
219 19 wait_until_state_name(Client, [connected], Block).
220
221 %% @doc Cast a MQTT-SN sleep request
222 %%
223 %% @param Client the client object
224 %% @param Interval sleep interval(ms)
225 %%
226 %%
227 %% @end
228 -spec sleep(client(), pos_integer(), boolean()) -> ok.
229 sleep(Client, Interval, Block) ->
230 1 gen_statem:cast(Client, {sleep, Interval}),
231 1 wait_until_state_name(Client, [asleep], Block).
232
233 %% @doc Cast a MQTT-SN disconnect request(non-blocking)
234 %%
235 %% @param Client the client object
236 %%
237 %% @end
238 -spec disconnect(client()) -> ok.
239 disconnect(Client) ->
240 23 gen_statem:cast(Client, disconnect).
241
242 %% @doc Get state data of the client(force-blocking)
243 %%
244 %% @param Client the client object
245 %%
246 %% @returns state data
247 %% @end
248 -spec get_state(client()) -> state().
249 get_state(Client) ->
250 41 State = gen_statem:call(Client, get_state),
251 41 State.
252
253 %% @doc Get state name of the client(force-blocking)
254 %%
255 %% @param Client the client object
256 %%
257 %%
258 %% @end
259 -spec get_state_name(client()) -> atom().
260 get_state_name(Client) ->
261 8789 StateName = gen_statem:call(Client, get_state_name),
262 8789 StateName.
263
264 %% @doc Set new config of the client(non-blocking)
265 %%
266 %% @param Client the client object
267 %% @param Config new config to be set
268 %%
269 %% @returns state name
270 %% @end
271 -spec reset_config(client(), #config{}) -> ok.
272 reset_config(Client, Config) ->
273 1 gen_statem:cast(Client, {config, Config}).
274
275 %% @doc Only Stop the state machine client, but not disconnect(non-blocking)
276 %%
277 %% @param Client the client object
278 %% @param Config new config to be set
279 %%
280 %% @returns socket for low-level API, can be closed if not use
281 %% @end
282 -spec stop(client()) -> {ok, inet:socket()}.
283 stop(Client) ->
284 31 #state{socket = Socket} = get_state(Client),
285 31 gen_statem:stop(Client),
286 31 {ok, Socket}.
287
288 %% @doc Stop and disconnect the client(non-blocking)
289 %%
290 %% @param Client the client object
291 %% @param Config new config to be set
292 %%
293 %% @returns socket for low-level API, can be closed if not use
294 %% @end
295 -spec finalize(client()) -> ok.
296 finalize(Client) ->
297 30 StateName = get_state_name(Client),
298 30 case StateName =/= initialized andalso StateName =/= found of
299 true ->
300 23 disconnect(Client),
301 23 stop(Client);
302 false ->
303 7 stop(Client)
304 end,
305 30 ok.
Line Hits Source