1 |
|
%%------------------------------------------------------------------------- |
2 |
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%------------------------------------------------------------------------- |
16 |
|
|
17 |
|
%% @doc utility designed for message or gateway, often used with aync |
18 |
|
%% message handle. |
19 |
|
-module(emqttsn_utils). |
20 |
|
|
21 |
|
%% @headerfile "emqttsn.hrl" |
22 |
|
|
23 |
|
-include("emqttsn.hrl"). |
24 |
|
-include("logger.hrl"). |
25 |
|
|
26 |
|
-export([init_global_store/1, store_msg/4, get_msg/2, get_msg/1, get_one_topic_msg/3, |
27 |
|
get_all_topic_id/1, get_topic_id_from_name/3, store_gw/2, get_gw/3, get_gw/2, |
28 |
|
get_all_gw/2, get_all_gw/1, default_msg_handler/2]). |
29 |
|
|
30 |
|
%%-------------------------------------------------------------------- |
31 |
|
%% gateway management lower utilities |
32 |
|
%%-------------------------------------------------------------------- |
33 |
|
|
34 |
|
%% @doc init ets storage for gateway |
35 |
|
%% |
36 |
|
%% Caution: not need in most cases, if you don't |
37 |
|
%% know about mqttsn gateway storage, do not use it. |
38 |
|
%% |
39 |
|
%% @param Name unique name of client, used also as client id |
40 |
|
%% |
41 |
|
%% @end |
42 |
|
-spec init_global_store(string()) -> ok. |
43 |
|
init_global_store(Name) -> |
44 |
27 |
NameGW = list_to_atom(Name), |
45 |
27 |
Exist = ets:whereis(NameGW), |
46 |
27 |
case Exist of |
47 |
|
undefined -> |
48 |
27 |
_ = ets:new(NameGW, [{keypos, #gw_info.id}, named_table, public, set]), |
49 |
27 |
ok; |
50 |
|
_ -> |
51 |
:-( |
ok |
52 |
|
end. |
53 |
|
|
54 |
|
%%-------------------------------------------------------------------- |
55 |
|
%% message management lower utilities |
56 |
|
%%-------------------------------------------------------------------- |
57 |
|
|
58 |
|
%% @doc default message handler when sync handle |
59 |
|
%% |
60 |
|
%% Caution: not need in most cases, if you don't |
61 |
|
%% know about message handler, do not use it. |
62 |
|
%% |
63 |
|
%% @param TopicId topic id of incoming message |
64 |
|
%% @param Msg data of incoming message |
65 |
|
%% |
66 |
|
%% @end |
67 |
|
-spec default_msg_handler(topic_id(), string()) -> ok. |
68 |
|
default_msg_handler(TopicId, Msg) -> |
69 |
7 |
?LOGP(notice, "new message: ~p, topic id: ~p", [TopicId, Msg]), |
70 |
7 |
ok. |
71 |
|
|
72 |
|
-spec msg_handler_recall([msg_handler()], topic_id(), string()) -> ok. |
73 |
|
msg_handler_recall(Handlers, TopicId, Msg) -> |
74 |
8 |
lists:foreach(fun(Handler) -> Handler(TopicId, Msg) end, Handlers), |
75 |
8 |
ok. |
76 |
|
|
77 |
|
-spec store_msg_async(dict:dict(topic_id(), queue:queue()), |
78 |
|
dict:dict(topic_id(), non_neg_integer()), |
79 |
|
topic_id(), |
80 |
|
non_neg_integer(), |
81 |
|
string()) -> |
82 |
|
{dict:dict(topic_id(), queue:queue()), |
83 |
|
dict:dict(topic_id(), non_neg_integer())}. |
84 |
|
store_msg_async(MsgManager, MsgCounter, TopicId, TopicMax, Msg) -> |
85 |
4 |
case {dict:find(TopicId, MsgCounter), dict:find(TopicId, MsgManager)} of |
86 |
|
{{ok, OldNum}, {ok, MsgQueue}} when OldNum < TopicMax -> |
87 |
:-( |
Num = OldNum + 1, |
88 |
:-( |
NewQueue = queue:in(Msg, MsgQueue); |
89 |
|
{{ok, OldNum}, {ok, MsgQueue}} when OldNum >= TopicMax -> |
90 |
:-( |
Num = OldNum, |
91 |
:-( |
{{value, _Item}, TmpQueue} = queue:out(MsgQueue), |
92 |
:-( |
NewQueue = queue:in(Msg, TmpQueue); |
93 |
|
{error, error} -> |
94 |
4 |
Num = 0, |
95 |
4 |
NewQueue = queue:from_list([Msg]) |
96 |
|
end, |
97 |
4 |
NewMsgCounter = dict:store(TopicId, Num, MsgCounter), |
98 |
4 |
NewMsgManager = dict:store(TopicId, NewQueue, MsgManager), |
99 |
4 |
{NewMsgManager, NewMsgCounter}. |
100 |
|
|
101 |
|
-spec refresh_msg_manager(topic_id(), |
102 |
|
dict:dict(topic_id(), queue:queue()), |
103 |
|
dict:dict(topic_id(), non_neg_integer())) -> |
104 |
|
{dict:dict(topic_id(), queue:queue()), |
105 |
|
dict:dict(topic_id(), non_neg_integer()), |
106 |
|
[string()]}. |
107 |
|
refresh_msg_manager(TopicId, MsgManager, MsgCounter) -> |
108 |
25 |
case dict:find(TopicId, MsgManager) of |
109 |
|
{ok, MsgQueue} -> |
110 |
4 |
NewMsgManager = dict:erase(TopicId, MsgManager), |
111 |
4 |
NewMsgCounter = dict:erase(TopicId, MsgCounter), |
112 |
4 |
{NewMsgManager, NewMsgCounter, queue:to_list(MsgQueue)}; |
113 |
|
error -> |
114 |
21 |
{MsgManager, MsgCounter, []} |
115 |
|
end. |
116 |
|
|
117 |
|
%%-------------------------------------------------------------------- |
118 |
|
%% message management API |
119 |
|
%%-------------------------------------------------------------------- |
120 |
|
|
121 |
|
%% @doc store an message(async handler API) |
122 |
|
%% |
123 |
|
%% Caution: not need in most cases, if you don't |
124 |
|
%% know about message storage, do not use it. |
125 |
|
%% |
126 |
|
%% @param State state data of state machine |
127 |
|
%% @param TopicId topic id of message |
128 |
|
%% @param TopicMax maximum number of message to stored by one topic |
129 |
|
%% @param Msg data of message |
130 |
|
%% |
131 |
|
%% @end |
132 |
|
-spec store_msg(state(), topic_id(), non_neg_integer(), string()) -> state(). |
133 |
|
store_msg(State, TopicId, TopicMax, Msg) -> |
134 |
12 |
#state{msg_manager = MsgManager, |
135 |
|
msg_counter = MsgCounter, |
136 |
|
config = Config} = |
137 |
|
State, |
138 |
12 |
#config{msg_handler = Handlers} = Config, |
139 |
12 |
case Handlers of |
140 |
|
[] -> |
141 |
4 |
{NewMsgManager, NewMsgCounter} = |
142 |
|
store_msg_async(MsgManager, MsgCounter, TopicId, TopicMax, Msg), |
143 |
4 |
State#state{msg_manager = NewMsgManager, msg_counter = NewMsgCounter}; |
144 |
|
_ -> |
145 |
8 |
msg_handler_recall(Handlers, TopicId, Msg), |
146 |
8 |
State#state{msg_manager = MsgManager, msg_counter = MsgCounter} |
147 |
|
end. |
148 |
|
|
149 |
|
%% @doc collect all message of a topic id(async handler API) |
150 |
|
%% |
151 |
|
%% @param Client the client object |
152 |
|
%% @param TopicId topic id to fetch messages |
153 |
|
%% @param Block whether make a block/unblock reques(wait until message is exist) |
154 |
|
%% |
155 |
|
%% @end |
156 |
|
-spec get_one_topic_msg(client(), topic_id(), boolean()) -> {ok, [string()]} | invalid. |
157 |
|
get_one_topic_msg(Client, TopicId, Block) -> |
158 |
25 |
State = gen_statem:call(Client, get_state), |
159 |
25 |
#state{msg_manager = MsgManager, |
160 |
|
msg_counter = MsgCounter, |
161 |
|
config = Config} = |
162 |
|
State, |
163 |
25 |
#config{msg_handler = Handlers} = Config, |
164 |
25 |
if Handlers =/= [] -> |
165 |
:-( |
invalid; |
166 |
|
Handlers =:= [] -> |
167 |
25 |
{NewMsgManager, NewMsgCounter, Messages} = |
168 |
|
refresh_msg_manager(TopicId, MsgManager, MsgCounter), |
169 |
25 |
case {Block, Messages} of |
170 |
|
{false, _} -> |
171 |
:-( |
gen_statem:cast(Client, {reset_msg, NewMsgManager, NewMsgCounter}), |
172 |
:-( |
{ok, Messages}; |
173 |
|
{true, []} -> |
174 |
21 |
get_one_topic_msg(Client, TopicId, true); |
175 |
|
{true, _} -> |
176 |
4 |
gen_statem:cast(Client, {reset_msg, NewMsgManager, NewMsgCounter}), |
177 |
4 |
{ok, Messages} |
178 |
|
end |
179 |
|
end. |
180 |
|
|
181 |
|
%% @doc collect all message of some topic id(async handler API, non-blocking) |
182 |
|
%% |
183 |
|
%% @param Client the client object |
184 |
|
%% @param TopicIds array of topic id to fetch messages |
185 |
|
%% |
186 |
|
%% @end |
187 |
|
-spec get_msg(client(), [topic_id()]) -> |
188 |
|
{ok, dict:dict(topic_id(), [string()])} | invalid. |
189 |
|
get_msg(Client, TopicIds) -> |
190 |
:-( |
{ok, KVList} = get_msg(Client, TopicIds, []), |
191 |
:-( |
{ok, dict:from_list(KVList)}. |
192 |
|
|
193 |
|
-spec get_msg(client(), [topic_id()], [{topic_id(), [string()]}]) -> |
194 |
|
{ok, [{topic_id(), [string()]}]} | invalid. |
195 |
|
get_msg(_Client, [], Ret) -> |
196 |
:-( |
{ok, Ret}; |
197 |
|
get_msg(Client, [TopicId | TopicIds], Ret) -> |
198 |
:-( |
case get_one_topic_msg(Client, TopicId, false) of |
199 |
|
invalid -> |
200 |
:-( |
invalid; |
201 |
|
{ok, MsgOfTopicId} -> |
202 |
:-( |
get_msg(Client, TopicIds, Ret ++ [{TopicId, MsgOfTopicId}]) |
203 |
|
end. |
204 |
|
|
205 |
|
%% @doc collect all message(async handler API, non-blocking) |
206 |
|
%% |
207 |
|
%% @param Client the client object |
208 |
|
%% @equiv get_msg(Client, AllTopic) |
209 |
|
%% |
210 |
|
%% @end |
211 |
|
-spec get_msg(client()) -> {ok, dict:dict(topic_id(), [string()])} | invalid. |
212 |
|
get_msg(Client) -> |
213 |
:-( |
State = gen_statem:call(Client, get_state), |
214 |
:-( |
#state{msg_manager = MsgManager} = State, |
215 |
:-( |
get_msg(Client, dict:fetch_keys(MsgManager)). |
216 |
|
|
217 |
|
%% @doc collect all topic id(non-blocking) |
218 |
|
%% |
219 |
|
%% @param Client the client object |
220 |
|
%% |
221 |
|
%% @end |
222 |
|
-spec get_all_topic_id(client()) -> [topic_id()]. |
223 |
|
get_all_topic_id(Client) -> |
224 |
:-( |
State = gen_statem:call(Client, get_state), |
225 |
:-( |
#state{msg_manager = MsgManager} = State, |
226 |
:-( |
dict:fetch_keys(MsgManager). |
227 |
|
|
228 |
|
%% @doc get topic id from topic name |
229 |
|
%% |
230 |
|
%% @param Client the client object |
231 |
|
%% @param TopicName topic name |
232 |
|
%% @param Block whether make a block/unblock request(wait until topic id is exist) |
233 |
|
%% |
234 |
|
%% @end |
235 |
|
-spec get_topic_id_from_name(client(), string(), boolean()) -> {ok, topic_id()} | none. |
236 |
|
get_topic_id_from_name(Client, TopicName, Block) -> |
237 |
3 |
State = gen_statem:call(Client, get_state), |
238 |
3 |
#state{topic_name_id = Map} = State, |
239 |
3 |
case {Block, dict:find(TopicName, Map)} of |
240 |
|
{false, Ret} -> |
241 |
:-( |
Ret; |
242 |
|
{true, {ok, TopicId}} -> |
243 |
3 |
{ok, TopicId}; |
244 |
|
{true, error} -> |
245 |
:-( |
get_topic_id_from_name(Client, TopicName, true) |
246 |
|
end. |
247 |
|
|
248 |
|
%%-------------------------------------------------------------------- |
249 |
|
%% gateway management API |
250 |
|
%%-------------------------------------------------------------------- |
251 |
|
|
252 |
|
%% @doc store gateway into ets |
253 |
|
%% |
254 |
|
%% @param Name unique name of client, used also as client id |
255 |
|
%% @param GWInfo gateway information(including gateway id, host, port and come-from source) |
256 |
|
%% |
257 |
|
%% @end |
258 |
|
-spec store_gw(string(), #gw_info{}) -> boolean(). |
259 |
|
store_gw(Name, GWInfo = #gw_info{id = GWId, from = SRC}) -> |
260 |
23 |
NameGW = list_to_atom(Name), |
261 |
23 |
case ets:lookup(NameGW, GWId) of |
262 |
|
[#gw_info{from = OldSRC}] when SRC < OldSRC -> |
263 |
1 |
?LOGP(warning, "insert into ~p for gateway id:~p failed", [Name, GWId]), |
264 |
1 |
false; |
265 |
|
[] -> |
266 |
22 |
ets:insert(NameGW, GWInfo), |
267 |
22 |
?LOGP(notice, "insert into ~p for gateway id:~p", [Name, GWId]), |
268 |
22 |
true; |
269 |
|
[#gw_info{from = OldSRC}] when SRC >= OldSRC -> |
270 |
:-( |
ets:insert(NameGW, GWInfo), |
271 |
:-( |
?LOGP(warning, "substitude gateway id:~p", [Name, GWId]), |
272 |
:-( |
true |
273 |
|
end. |
274 |
|
|
275 |
|
%% @doc fetch existing gateway from ets by gateway id unblockly(common used) |
276 |
|
%% |
277 |
|
%% @param Name unique name of client, used also as client id |
278 |
|
%% @param GWId gateway id to identify an gateway |
279 |
|
%% |
280 |
|
%% @end |
281 |
|
-spec get_gw(string(), gw_id()) -> #gw_info{} | none. |
282 |
|
get_gw(Name, GWId) -> |
283 |
19 |
get_gw(Name, GWId, false). |
284 |
|
|
285 |
|
%% @doc fetch existing gateway from ets by gateway id |
286 |
|
%% |
287 |
|
%% @param Name unique name of client, used also as client id |
288 |
|
%% @param GWId gateway id to identify an gateway |
289 |
|
%% @param Block whether make a block/unblock request(wait until gateway is exist) |
290 |
|
%% |
291 |
|
%% @end |
292 |
|
-spec get_gw(string(), gw_id(), boolean()) -> #gw_info{} | none. |
293 |
|
get_gw(Name, GWId, Block) -> |
294 |
2005 |
NameGW = list_to_atom(Name), |
295 |
2005 |
case {ets:lookup(NameGW, GWId), Block} of |
296 |
|
{[GWInfo = #gw_info{}], _} -> |
297 |
22 |
GWInfo; |
298 |
|
{[], false} -> |
299 |
:-( |
?LOGP(warning, "not gateway to fetch from ~p for selected gateway id:~p", [Name, GWId]), |
300 |
:-( |
none; |
301 |
|
{[], true} -> |
302 |
1983 |
?LOGP(debug, |
303 |
|
"not gateway to fetch from ~p for selected gateway id:~p, try " |
304 |
|
"again", |
305 |
1983 |
[Name, GWId]), |
306 |
1983 |
get_gw(Name, GWId, Block) |
307 |
|
end. |
308 |
|
|
309 |
|
%% @doc fetch all existing gateway from ets unblockly(common used) |
310 |
|
%% |
311 |
|
%% @param Name unique name of client, used also as client id |
312 |
|
%% |
313 |
|
%% @end |
314 |
|
-spec get_all_gw(string()) -> [#gw_info{}]. |
315 |
|
get_all_gw(Name) -> |
316 |
:-( |
get_all_gw(Name, false). |
317 |
|
|
318 |
|
%% @doc fetch all existing gateway from ets |
319 |
|
%% |
320 |
|
%% @param Name unique name of client, used also as client id |
321 |
|
%% @param Block whether make a block/unblock request(wait until gateway is exist) |
322 |
|
%% |
323 |
|
%% @end |
324 |
|
-spec get_all_gw(string(), boolean()) -> [#gw_info{}]. |
325 |
|
get_all_gw(Name, Block) -> |
326 |
:-( |
NameGW = list_to_atom(Name), |
327 |
:-( |
Ret = ets:tab2list(NameGW), |
328 |
:-( |
case {Ret, Block} of |
329 |
|
{[], false} -> |
330 |
:-( |
?LOGP(warning, "not gateway to fetch from ~p", [Name]), |
331 |
:-( |
[]; |
332 |
|
{[], true} -> |
333 |
:-( |
?LOGP(debug, "not gateway to fetch from ~p, try again", [Name]), |
334 |
:-( |
get_all_gw(Name, Block); |
335 |
|
{_, _} -> |
336 |
:-( |
Ret |
337 |
|
end. |