/__w/emqttsn/emqttsn/_build/test/cover/ct/emqttsn_utils.html

1 %%-------------------------------------------------------------------------
2 %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%-------------------------------------------------------------------------
16
17 %% @doc utility designed for message or gateway, often used with aync
18 %% message handle.
19 -module(emqttsn_utils).
20
21 %% @headerfile "emqttsn.hrl"
22
23 -include("emqttsn.hrl").
24 -include("logger.hrl").
25
26 -export([init_global_store/1, store_msg/4, get_msg/2, get_msg/1, get_one_topic_msg/3,
27 get_all_topic_id/1, get_topic_id_from_name/3, store_gw/2, get_gw/3, get_gw/2,
28 get_all_gw/2, get_all_gw/1, default_msg_handler/2]).
29
30 %%--------------------------------------------------------------------
31 %% gateway management lower utilities
32 %%--------------------------------------------------------------------
33
34 %% @doc init ets storage for gateway
35 %%
36 %% Caution: not need in most cases, if you don't
37 %% know about mqttsn gateway storage, do not use it.
38 %%
39 %% @param Name unique name of client, used also as client id
40 %%
41 %% @end
42 -spec init_global_store(string()) -> ok.
43 init_global_store(Name) ->
44 27 NameGW = list_to_atom(Name),
45 27 Exist = ets:whereis(NameGW),
46 27 case Exist of
47 undefined ->
48 27 _ = ets:new(NameGW, [{keypos, #gw_info.id}, named_table, public, set]),
49 27 ok;
50 _ ->
51
:-(
ok
52 end.
53
54 %%--------------------------------------------------------------------
55 %% message management lower utilities
56 %%--------------------------------------------------------------------
57
58 %% @doc default message handler when sync handle
59 %%
60 %% Caution: not need in most cases, if you don't
61 %% know about message handler, do not use it.
62 %%
63 %% @param TopicId topic id of incoming message
64 %% @param Msg data of incoming message
65 %%
66 %% @end
67 -spec default_msg_handler(topic_id(), string()) -> ok.
68 default_msg_handler(TopicId, Msg) ->
69 7 ?LOGP(notice, "new message: ~p, topic id: ~p", [TopicId, Msg]),
70 7 ok.
71
72 -spec msg_handler_recall([msg_handler()], topic_id(), string()) -> ok.
73 msg_handler_recall(Handlers, TopicId, Msg) ->
74 8 lists:foreach(fun(Handler) -> Handler(TopicId, Msg) end, Handlers),
75 8 ok.
76
77 -spec store_msg_async(dict:dict(topic_id(), queue:queue()),
78 dict:dict(topic_id(), non_neg_integer()),
79 topic_id(),
80 non_neg_integer(),
81 string()) ->
82 {dict:dict(topic_id(), queue:queue()),
83 dict:dict(topic_id(), non_neg_integer())}.
84 store_msg_async(MsgManager, MsgCounter, TopicId, TopicMax, Msg) ->
85 4 case {dict:find(TopicId, MsgCounter), dict:find(TopicId, MsgManager)} of
86 {{ok, OldNum}, {ok, MsgQueue}} when OldNum < TopicMax ->
87
:-(
Num = OldNum + 1,
88
:-(
NewQueue = queue:in(Msg, MsgQueue);
89 {{ok, OldNum}, {ok, MsgQueue}} when OldNum >= TopicMax ->
90
:-(
Num = OldNum,
91
:-(
{{value, _Item}, TmpQueue} = queue:out(MsgQueue),
92
:-(
NewQueue = queue:in(Msg, TmpQueue);
93 {error, error} ->
94 4 Num = 0,
95 4 NewQueue = queue:from_list([Msg])
96 end,
97 4 NewMsgCounter = dict:store(TopicId, Num, MsgCounter),
98 4 NewMsgManager = dict:store(TopicId, NewQueue, MsgManager),
99 4 {NewMsgManager, NewMsgCounter}.
100
101 -spec refresh_msg_manager(topic_id(),
102 dict:dict(topic_id(), queue:queue()),
103 dict:dict(topic_id(), non_neg_integer())) ->
104 {dict:dict(topic_id(), queue:queue()),
105 dict:dict(topic_id(), non_neg_integer()),
106 [string()]}.
107 refresh_msg_manager(TopicId, MsgManager, MsgCounter) ->
108 25 case dict:find(TopicId, MsgManager) of
109 {ok, MsgQueue} ->
110 4 NewMsgManager = dict:erase(TopicId, MsgManager),
111 4 NewMsgCounter = dict:erase(TopicId, MsgCounter),
112 4 {NewMsgManager, NewMsgCounter, queue:to_list(MsgQueue)};
113 error ->
114 21 {MsgManager, MsgCounter, []}
115 end.
116
117 %%--------------------------------------------------------------------
118 %% message management API
119 %%--------------------------------------------------------------------
120
121 %% @doc store an message(async handler API)
122 %%
123 %% Caution: not need in most cases, if you don't
124 %% know about message storage, do not use it.
125 %%
126 %% @param State state data of state machine
127 %% @param TopicId topic id of message
128 %% @param TopicMax maximum number of message to stored by one topic
129 %% @param Msg data of message
130 %%
131 %% @end
132 -spec store_msg(state(), topic_id(), non_neg_integer(), string()) -> state().
133 store_msg(State, TopicId, TopicMax, Msg) ->
134 12 #state{msg_manager = MsgManager,
135 msg_counter = MsgCounter,
136 config = Config} =
137 State,
138 12 #config{msg_handler = Handlers} = Config,
139 12 case Handlers of
140 [] ->
141 4 {NewMsgManager, NewMsgCounter} =
142 store_msg_async(MsgManager, MsgCounter, TopicId, TopicMax, Msg),
143 4 State#state{msg_manager = NewMsgManager, msg_counter = NewMsgCounter};
144 _ ->
145 8 msg_handler_recall(Handlers, TopicId, Msg),
146 8 State#state{msg_manager = MsgManager, msg_counter = MsgCounter}
147 end.
148
149 %% @doc collect all message of a topic id(async handler API)
150 %%
151 %% @param Client the client object
152 %% @param TopicId topic id to fetch messages
153 %% @param Block whether make a block/unblock reques(wait until message is exist)
154 %%
155 %% @end
156 -spec get_one_topic_msg(client(), topic_id(), boolean()) -> {ok, [string()]} | invalid.
157 get_one_topic_msg(Client, TopicId, Block) ->
158 25 State = gen_statem:call(Client, get_state),
159 25 #state{msg_manager = MsgManager,
160 msg_counter = MsgCounter,
161 config = Config} =
162 State,
163 25 #config{msg_handler = Handlers} = Config,
164 25 if Handlers =/= [] ->
165
:-(
invalid;
166 Handlers =:= [] ->
167 25 {NewMsgManager, NewMsgCounter, Messages} =
168 refresh_msg_manager(TopicId, MsgManager, MsgCounter),
169 25 case {Block, Messages} of
170 {false, _} ->
171
:-(
gen_statem:cast(Client, {reset_msg, NewMsgManager, NewMsgCounter}),
172
:-(
{ok, Messages};
173 {true, []} ->
174 21 get_one_topic_msg(Client, TopicId, true);
175 {true, _} ->
176 4 gen_statem:cast(Client, {reset_msg, NewMsgManager, NewMsgCounter}),
177 4 {ok, Messages}
178 end
179 end.
180
181 %% @doc collect all message of some topic id(async handler API, non-blocking)
182 %%
183 %% @param Client the client object
184 %% @param TopicIds array of topic id to fetch messages
185 %%
186 %% @end
187 -spec get_msg(client(), [topic_id()]) ->
188 {ok, dict:dict(topic_id(), [string()])} | invalid.
189 get_msg(Client, TopicIds) ->
190
:-(
{ok, KVList} = get_msg(Client, TopicIds, []),
191
:-(
{ok, dict:from_list(KVList)}.
192
193 -spec get_msg(client(), [topic_id()], [{topic_id(), [string()]}]) ->
194 {ok, [{topic_id(), [string()]}]} | invalid.
195 get_msg(_Client, [], Ret) ->
196
:-(
{ok, Ret};
197 get_msg(Client, [TopicId | TopicIds], Ret) ->
198
:-(
case get_one_topic_msg(Client, TopicId, false) of
199 invalid ->
200
:-(
invalid;
201 {ok, MsgOfTopicId} ->
202
:-(
get_msg(Client, TopicIds, Ret ++ [{TopicId, MsgOfTopicId}])
203 end.
204
205 %% @doc collect all message(async handler API, non-blocking)
206 %%
207 %% @param Client the client object
208 %% @equiv get_msg(Client, AllTopic)
209 %%
210 %% @end
211 -spec get_msg(client()) -> {ok, dict:dict(topic_id(), [string()])} | invalid.
212 get_msg(Client) ->
213
:-(
State = gen_statem:call(Client, get_state),
214
:-(
#state{msg_manager = MsgManager} = State,
215
:-(
get_msg(Client, dict:fetch_keys(MsgManager)).
216
217 %% @doc collect all topic id(non-blocking)
218 %%
219 %% @param Client the client object
220 %%
221 %% @end
222 -spec get_all_topic_id(client()) -> [topic_id()].
223 get_all_topic_id(Client) ->
224
:-(
State = gen_statem:call(Client, get_state),
225
:-(
#state{msg_manager = MsgManager} = State,
226
:-(
dict:fetch_keys(MsgManager).
227
228 %% @doc get topic id from topic name
229 %%
230 %% @param Client the client object
231 %% @param TopicName topic name
232 %% @param Block whether make a block/unblock request(wait until topic id is exist)
233 %%
234 %% @end
235 -spec get_topic_id_from_name(client(), string(), boolean()) -> {ok, topic_id()} | none.
236 get_topic_id_from_name(Client, TopicName, Block) ->
237 3 State = gen_statem:call(Client, get_state),
238 3 #state{topic_name_id = Map} = State,
239 3 case {Block, dict:find(TopicName, Map)} of
240 {false, Ret} ->
241
:-(
Ret;
242 {true, {ok, TopicId}} ->
243 3 {ok, TopicId};
244 {true, error} ->
245
:-(
get_topic_id_from_name(Client, TopicName, true)
246 end.
247
248 %%--------------------------------------------------------------------
249 %% gateway management API
250 %%--------------------------------------------------------------------
251
252 %% @doc store gateway into ets
253 %%
254 %% @param Name unique name of client, used also as client id
255 %% @param GWInfo gateway information(including gateway id, host, port and come-from source)
256 %%
257 %% @end
258 -spec store_gw(string(), #gw_info{}) -> boolean().
259 store_gw(Name, GWInfo = #gw_info{id = GWId, from = SRC}) ->
260 23 NameGW = list_to_atom(Name),
261 23 case ets:lookup(NameGW, GWId) of
262 [#gw_info{from = OldSRC}] when SRC < OldSRC ->
263 1 ?LOGP(warning, "insert into ~p for gateway id:~p failed", [Name, GWId]),
264 1 false;
265 [] ->
266 22 ets:insert(NameGW, GWInfo),
267 22 ?LOGP(notice, "insert into ~p for gateway id:~p", [Name, GWId]),
268 22 true;
269 [#gw_info{from = OldSRC}] when SRC >= OldSRC ->
270
:-(
ets:insert(NameGW, GWInfo),
271
:-(
?LOGP(warning, "substitude gateway id:~p", [Name, GWId]),
272
:-(
true
273 end.
274
275 %% @doc fetch existing gateway from ets by gateway id unblockly(common used)
276 %%
277 %% @param Name unique name of client, used also as client id
278 %% @param GWId gateway id to identify an gateway
279 %%
280 %% @end
281 -spec get_gw(string(), gw_id()) -> #gw_info{} | none.
282 get_gw(Name, GWId) ->
283 19 get_gw(Name, GWId, false).
284
285 %% @doc fetch existing gateway from ets by gateway id
286 %%
287 %% @param Name unique name of client, used also as client id
288 %% @param GWId gateway id to identify an gateway
289 %% @param Block whether make a block/unblock request(wait until gateway is exist)
290 %%
291 %% @end
292 -spec get_gw(string(), gw_id(), boolean()) -> #gw_info{} | none.
293 get_gw(Name, GWId, Block) ->
294 2005 NameGW = list_to_atom(Name),
295 2005 case {ets:lookup(NameGW, GWId), Block} of
296 {[GWInfo = #gw_info{}], _} ->
297 22 GWInfo;
298 {[], false} ->
299
:-(
?LOGP(warning, "not gateway to fetch from ~p for selected gateway id:~p", [Name, GWId]),
300
:-(
none;
301 {[], true} ->
302 1983 ?LOGP(debug,
303 "not gateway to fetch from ~p for selected gateway id:~p, try "
304 "again",
305 1983 [Name, GWId]),
306 1983 get_gw(Name, GWId, Block)
307 end.
308
309 %% @doc fetch all existing gateway from ets unblockly(common used)
310 %%
311 %% @param Name unique name of client, used also as client id
312 %%
313 %% @end
314 -spec get_all_gw(string()) -> [#gw_info{}].
315 get_all_gw(Name) ->
316
:-(
get_all_gw(Name, false).
317
318 %% @doc fetch all existing gateway from ets
319 %%
320 %% @param Name unique name of client, used also as client id
321 %% @param Block whether make a block/unblock request(wait until gateway is exist)
322 %%
323 %% @end
324 -spec get_all_gw(string(), boolean()) -> [#gw_info{}].
325 get_all_gw(Name, Block) ->
326
:-(
NameGW = list_to_atom(Name),
327
:-(
Ret = ets:tab2list(NameGW),
328
:-(
case {Ret, Block} of
329 {[], false} ->
330
:-(
?LOGP(warning, "not gateway to fetch from ~p", [Name]),
331
:-(
[];
332 {[], true} ->
333
:-(
?LOGP(debug, "not gateway to fetch from ~p, try again", [Name]),
334
:-(
get_all_gw(Name, Block);
335 {_, _} ->
336
:-(
Ret
337 end.
Line Hits Source