Improve the moudules and fix the bugs found in new test cases
- Fix the bug that emqx_mountpoint:unmount/2 will throw exception - Add emqx_banned:info/1 for test cases - Rename macro TRIE in emqx_trie module to TRIE_TAB - Rename macro TRIE_NODE in emqx_trie module to TRIE_NODE_TAB - Rename macro ROUTE in emqx_router module to ROUTE_TAB
This commit is contained in:
parent
4afa02ee48
commit
f60f127681
|
@ -31,7 +31,7 @@
|
|||
[{deps,
|
||||
[{meck, "0.8.13"}, % hex
|
||||
{bbmustache, "1.7.0"}, % hex
|
||||
{emqx_ct_helpers, "1.1.3"} % hex
|
||||
{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "develop"}}}
|
||||
]}
|
||||
]}
|
||||
]}.
|
||||
|
|
|
@ -32,9 +32,10 @@
|
|||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([ add/1
|
||||
-export([ check/1
|
||||
, add/1
|
||||
, delete/1
|
||||
, check/1
|
||||
, info/1
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -81,7 +82,11 @@ add(Banned) when is_record(Banned, banned) ->
|
|||
-spec(delete({client_id, emqx_types:client_id()} |
|
||||
{username, emqx_types:username()} |
|
||||
{peername, emqx_types:peername()}) -> ok).
|
||||
delete(Key) -> mnesia:dirty_delete(?BANNED_TAB, Key).
|
||||
delete(Key) ->
|
||||
mnesia:dirty_delete(?BANNED_TAB, Key).
|
||||
|
||||
info(InfoKey) ->
|
||||
mnesia:table_info(?BANNED_TAB, InfoKey).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
|
|
@ -16,8 +16,11 @@
|
|||
|
||||
-module(emqx_inflight).
|
||||
|
||||
-compile(inline).
|
||||
|
||||
%% APIs
|
||||
-export([ new/1
|
||||
-export([ new/0
|
||||
, new/1
|
||||
, contain/2
|
||||
, lookup/2
|
||||
, insert/3
|
||||
|
@ -45,9 +48,8 @@
|
|||
|
||||
-define(Inflight(MaxSize, Tree), {inflight, MaxSize, (Tree)}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
-spec(new() -> inflight()).
|
||||
new() -> new(0).
|
||||
|
||||
-spec(new(non_neg_integer()) -> inflight()).
|
||||
new(MaxSize) when MaxSize >= 0 ->
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_json).
|
||||
|
||||
-compile(inline).
|
||||
|
||||
-export([ encode/1
|
||||
, encode/2
|
||||
, safe_encode/1
|
||||
|
@ -32,7 +34,8 @@
|
|||
encode(Term) ->
|
||||
jsx:encode(Term).
|
||||
|
||||
-spec(encode(jsx:json_term(), jsx_to_json:config()) -> jsx:json_text()).
|
||||
-spec(encode(jsx:json_term(), jsx_to_json:config())
|
||||
-> jsx:json_text()).
|
||||
encode(Term, Opts) ->
|
||||
jsx:encode(Term, Opts).
|
||||
|
||||
|
@ -55,7 +58,8 @@ safe_encode(Term, Opts) ->
|
|||
decode(Json) ->
|
||||
jsx:decode(Json).
|
||||
|
||||
-spec(decode(jsx:json_text(), jsx_to_json:config()) -> jsx:json_term()).
|
||||
-spec(decode(jsx:json_text(), jsx_to_json:config())
|
||||
-> jsx:json_term()).
|
||||
decode(Json, Opts) ->
|
||||
jsx:decode(Json, Opts).
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
-module(emqx_mountpoint).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("types.hrl").
|
||||
|
||||
-export([ mount/2
|
||||
, unmount/2
|
||||
|
@ -29,41 +29,46 @@
|
|||
|
||||
-type(mountpoint() :: binary()).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(mount(maybe(mountpoint()), Any) -> Any
|
||||
when Any :: emqx_types:topic()
|
||||
| emqx_types:message()
|
||||
| emqx_types:topic_filters()).
|
||||
mount(undefined, Any) ->
|
||||
Any;
|
||||
mount(MountPoint, Topic) when is_binary(Topic) ->
|
||||
<<MountPoint/binary, Topic/binary>>;
|
||||
prefix(MountPoint, Topic);
|
||||
mount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||
Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
|
||||
Msg#message{topic = prefix(MountPoint, Topic)};
|
||||
mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
|
||||
[{<<MountPoint/binary, Topic/binary>>, SubOpts}
|
||||
|| {Topic, SubOpts} <- TopicFilters].
|
||||
[{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
|
||||
|
||||
unmount(undefined, Msg) ->
|
||||
Msg;
|
||||
%% TODO: Fixme later
|
||||
%% @private
|
||||
-compile({inline, [prefix/2]}).
|
||||
prefix(MountPoint, Topic) ->
|
||||
<<MountPoint/binary, Topic/binary>>.
|
||||
|
||||
-spec(unmount(maybe(mountpoint()), Any) -> Any
|
||||
when Any :: emqx_types:topic()
|
||||
| emqx_types:message()).
|
||||
unmount(undefined, Any) ->
|
||||
Any;
|
||||
unmount(MountPoint, Topic) when is_binary(Topic) ->
|
||||
try split_binary(Topic, byte_size(MountPoint)) of
|
||||
{MountPoint, Topic1} -> Topic1
|
||||
catch
|
||||
error:badarg-> Topic
|
||||
case string:prefix(Topic, MountPoint) of
|
||||
nomatch -> Topic;
|
||||
Topic1 -> Topic1
|
||||
end;
|
||||
unmount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||
try split_binary(Topic, byte_size(MountPoint)) of
|
||||
{MountPoint, Topic1} -> Msg#message{topic = Topic1}
|
||||
catch
|
||||
error:badarg->
|
||||
Msg
|
||||
case string:prefix(Topic, MountPoint) of
|
||||
nomatch -> Msg;
|
||||
Topic1 -> Msg#message{topic = Topic1}
|
||||
end.
|
||||
|
||||
-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())).
|
||||
replvar(undefined, _Vars) ->
|
||||
undefined;
|
||||
replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
|
||||
lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
||||
lists:foldl(fun feed_var/2, MountPoint,
|
||||
[{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
||||
|
||||
feed_var({<<"%c">>, ClientId}, MountPoint) ->
|
||||
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
|
||||
|
|
|
@ -163,5 +163,7 @@ connack_error(banned) -> ?RC_BANNED;
|
|||
connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
|
||||
connack_error(_) -> ?RC_NOT_AUTHORIZED.
|
||||
|
||||
%%TODO: This function should be removed.
|
||||
puback([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
||||
puback(L) when is_list(L) -> ?RC_SUCCESS.
|
||||
|
||||
|
|
|
@ -66,16 +66,16 @@
|
|||
|
||||
-type(group() :: binary()).
|
||||
|
||||
-type(destination() :: node() | {group(), node()}).
|
||||
-type(dest() :: node() | {group(), node()}).
|
||||
|
||||
-define(ROUTE, emqx_route).
|
||||
-define(ROUTE_TAB, emqx_route).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mnesia bootstrap
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = ekka_mnesia:create_table(?ROUTE, [
|
||||
ok = ekka_mnesia:create_table(?ROUTE_TAB, [
|
||||
{type, bag},
|
||||
{ram_copies, [node()]},
|
||||
{record_name, route},
|
||||
|
@ -83,7 +83,7 @@ mnesia(boot) ->
|
|||
{storage_properties, [{ets, [{read_concurrency, true},
|
||||
{write_concurrency, true}]}]}]);
|
||||
mnesia(copy) ->
|
||||
ok = ekka_mnesia:copy_table(?ROUTE).
|
||||
ok = ekka_mnesia:copy_table(?ROUTE_TAB).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Start a router
|
||||
|
@ -102,7 +102,7 @@ start_link(Pool, Id) ->
|
|||
add_route(Topic) when is_binary(Topic) ->
|
||||
add_route(Topic, node()).
|
||||
|
||||
-spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
-spec(add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
|
||||
add_route(Topic, Dest) when is_binary(Topic) ->
|
||||
call(pick(Topic), {add_route, Topic, Dest}).
|
||||
|
||||
|
@ -110,7 +110,7 @@ add_route(Topic, Dest) when is_binary(Topic) ->
|
|||
do_add_route(Topic) when is_binary(Topic) ->
|
||||
do_add_route(Topic, node()).
|
||||
|
||||
-spec(do_add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
|
||||
do_add_route(Topic, Dest) when is_binary(Topic) ->
|
||||
Route = #route{topic = Topic, dest = Dest},
|
||||
case lists:member(Route, lookup_routes(Topic)) of
|
||||
|
@ -142,17 +142,17 @@ match_trie(Topic) ->
|
|||
|
||||
-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
||||
lookup_routes(Topic) ->
|
||||
ets:lookup(?ROUTE, Topic).
|
||||
ets:lookup(?ROUTE_TAB, Topic).
|
||||
|
||||
-spec(has_routes(emqx_topic:topic()) -> boolean()).
|
||||
has_routes(Topic) when is_binary(Topic) ->
|
||||
ets:member(?ROUTE, Topic).
|
||||
ets:member(?ROUTE_TAB, Topic).
|
||||
|
||||
-spec(delete_route(emqx_topic:topic()) -> ok | {error, term()}).
|
||||
delete_route(Topic) when is_binary(Topic) ->
|
||||
delete_route(Topic, node()).
|
||||
|
||||
-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
-spec(delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
|
||||
delete_route(Topic, Dest) when is_binary(Topic) ->
|
||||
call(pick(Topic), {delete_route, Topic, Dest}).
|
||||
|
||||
|
@ -160,7 +160,7 @@ delete_route(Topic, Dest) when is_binary(Topic) ->
|
|||
do_delete_route(Topic) when is_binary(Topic) ->
|
||||
do_delete_route(Topic, node()).
|
||||
|
||||
-spec(do_delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
|
||||
do_delete_route(Topic, Dest) ->
|
||||
Route = #route{topic = Topic, dest = Dest},
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
|
@ -170,7 +170,7 @@ do_delete_route(Topic, Dest) ->
|
|||
|
||||
-spec(topics() -> list(emqx_topic:topic())).
|
||||
topics() ->
|
||||
mnesia:dirty_all_keys(?ROUTE).
|
||||
mnesia:dirty_all_keys(?ROUTE_TAB).
|
||||
|
||||
%% @doc Print routes to a topic
|
||||
-spec(print_routes(emqx_topic:topic()) -> ok).
|
||||
|
@ -224,25 +224,25 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
insert_direct_route(Route) ->
|
||||
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]).
|
||||
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE_TAB, Route, sticky_write]).
|
||||
|
||||
insert_trie_route(Route = #route{topic = Topic}) ->
|
||||
case mnesia:wread({?ROUTE, Topic}) of
|
||||
case mnesia:wread({?ROUTE_TAB, Topic}) of
|
||||
[] -> emqx_trie:insert(Topic);
|
||||
_ -> ok
|
||||
end,
|
||||
mnesia:write(?ROUTE, Route, sticky_write).
|
||||
mnesia:write(?ROUTE_TAB, Route, sticky_write).
|
||||
|
||||
delete_direct_route(Route) ->
|
||||
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]).
|
||||
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE_TAB, Route, sticky_write]).
|
||||
|
||||
delete_trie_route(Route = #route{topic = Topic}) ->
|
||||
case mnesia:wread({?ROUTE, Topic}) of
|
||||
case mnesia:wread({?ROUTE_TAB, Topic}) of
|
||||
[Route] -> %% Remove route and trie
|
||||
ok = mnesia:delete_object(?ROUTE, Route, sticky_write),
|
||||
ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write),
|
||||
emqx_trie:delete(Topic);
|
||||
[_|_] -> %% Remove route only
|
||||
mnesia:delete_object(?ROUTE, Route, sticky_write);
|
||||
mnesia:delete_object(?ROUTE_TAB, Route, sticky_write);
|
||||
[] -> ok
|
||||
end.
|
||||
|
||||
|
|
|
@ -34,8 +34,8 @@
|
|||
-export([empty/0]).
|
||||
|
||||
%% Mnesia tables
|
||||
-define(TRIE, emqx_trie).
|
||||
-define(TRIE_NODE, emqx_trie_node).
|
||||
-define(TRIE_TAB, emqx_trie).
|
||||
-define(TRIE_NODE_TAB, emqx_trie_node).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mnesia bootstrap
|
||||
|
@ -48,13 +48,13 @@ mnesia(boot) ->
|
|||
StoreProps = [{ets, [{read_concurrency, true},
|
||||
{write_concurrency, true}]}],
|
||||
%% Trie table
|
||||
ok = ekka_mnesia:create_table(?TRIE, [
|
||||
ok = ekka_mnesia:create_table(?TRIE_TAB, [
|
||||
{ram_copies, [node()]},
|
||||
{record_name, trie},
|
||||
{attributes, record_info(fields, trie)},
|
||||
{storage_properties, StoreProps}]),
|
||||
%% Trie node table
|
||||
ok = ekka_mnesia:create_table(?TRIE_NODE, [
|
||||
ok = ekka_mnesia:create_table(?TRIE_NODE_TAB, [
|
||||
{ram_copies, [node()]},
|
||||
{record_name, trie_node},
|
||||
{attributes, record_info(fields, trie_node)},
|
||||
|
@ -62,9 +62,9 @@ mnesia(boot) ->
|
|||
|
||||
mnesia(copy) ->
|
||||
%% Copy trie table
|
||||
ok = ekka_mnesia:copy_table(?TRIE),
|
||||
ok = ekka_mnesia:copy_table(?TRIE_TAB),
|
||||
%% Copy trie_node table
|
||||
ok = ekka_mnesia:copy_table(?TRIE_NODE).
|
||||
ok = ekka_mnesia:copy_table(?TRIE_NODE_TAB).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Trie APIs
|
||||
|
@ -73,7 +73,7 @@ mnesia(copy) ->
|
|||
%% @doc Insert a topic filter into the trie.
|
||||
-spec(insert(emqx_topic:topic()) -> ok).
|
||||
insert(Topic) when is_binary(Topic) ->
|
||||
case mnesia:wread({?TRIE_NODE, Topic}) of
|
||||
case mnesia:wread({?TRIE_NODE_TAB, Topic}) of
|
||||
[#trie_node{topic = Topic}] ->
|
||||
ok;
|
||||
[TrieNode = #trie_node{topic = undefined}] ->
|
||||
|
@ -94,14 +94,14 @@ match(Topic) when is_binary(Topic) ->
|
|||
%% @doc Lookup a trie node.
|
||||
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
|
||||
lookup(NodeId) ->
|
||||
mnesia:read(?TRIE_NODE, NodeId).
|
||||
mnesia:read(?TRIE_NODE_TAB, NodeId).
|
||||
|
||||
%% @doc Delete a topic filter from the trie.
|
||||
-spec(delete(emqx_topic:topic()) -> ok).
|
||||
delete(Topic) when is_binary(Topic) ->
|
||||
case mnesia:wread({?TRIE_NODE, Topic}) of
|
||||
case mnesia:wread({?TRIE_NODE_TAB, Topic}) of
|
||||
[#trie_node{edge_count = 0}] ->
|
||||
ok = mnesia:delete({?TRIE_NODE, Topic}),
|
||||
ok = mnesia:delete({?TRIE_NODE_TAB, Topic}),
|
||||
delete_path(lists:reverse(emqx_topic:triples(Topic)));
|
||||
[TrieNode] ->
|
||||
write_trie_node(TrieNode#trie_node{topic = undefined});
|
||||
|
@ -111,7 +111,7 @@ delete(Topic) when is_binary(Topic) ->
|
|||
%% @doc Is the trie empty?
|
||||
-spec(empty() -> boolean()).
|
||||
empty() ->
|
||||
ets:info(?TRIE, size) == 0.
|
||||
ets:info(?TRIE_TAB, size) == 0.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
|
@ -121,9 +121,9 @@ empty() ->
|
|||
%% @doc Add a path to the trie.
|
||||
add_path({Node, Word, Child}) ->
|
||||
Edge = #trie_edge{node_id = Node, word = Word},
|
||||
case mnesia:wread({?TRIE_NODE, Node}) of
|
||||
case mnesia:wread({?TRIE_NODE_TAB, Node}) of
|
||||
[TrieNode = #trie_node{edge_count = Count}] ->
|
||||
case mnesia:wread({?TRIE, Edge}) of
|
||||
case mnesia:wread({?TRIE_TAB, Edge}) of
|
||||
[] ->
|
||||
ok = write_trie_node(TrieNode#trie_node{edge_count = Count + 1}),
|
||||
write_trie(#trie{edge = Edge, node_id = Child});
|
||||
|
@ -143,11 +143,11 @@ match_node(NodeId, Words) ->
|
|||
match_node(NodeId, Words, []).
|
||||
|
||||
match_node(NodeId, [], ResAcc) ->
|
||||
mnesia:read(?TRIE_NODE, NodeId) ++ 'match_#'(NodeId, ResAcc);
|
||||
mnesia:read(?TRIE_NODE_TAB, NodeId) ++ 'match_#'(NodeId, ResAcc);
|
||||
|
||||
match_node(NodeId, [W|Words], ResAcc) ->
|
||||
lists:foldl(fun(WArg, Acc) ->
|
||||
case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = WArg}) of
|
||||
case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = WArg}) of
|
||||
[#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc);
|
||||
[] -> Acc
|
||||
end
|
||||
|
@ -156,9 +156,9 @@ match_node(NodeId, [W|Words], ResAcc) ->
|
|||
%% @private
|
||||
%% @doc Match node with '#'.
|
||||
'match_#'(NodeId, ResAcc) ->
|
||||
case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = '#'}) of
|
||||
case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = '#'}) of
|
||||
[#trie{node_id = ChildId}] ->
|
||||
mnesia:read(?TRIE_NODE, ChildId) ++ ResAcc;
|
||||
mnesia:read(?TRIE_NODE_TAB, ChildId) ++ ResAcc;
|
||||
[] -> ResAcc
|
||||
end.
|
||||
|
||||
|
@ -167,10 +167,10 @@ match_node(NodeId, [W|Words], ResAcc) ->
|
|||
delete_path([]) ->
|
||||
ok;
|
||||
delete_path([{NodeId, Word, _} | RestPath]) ->
|
||||
ok = mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}),
|
||||
case mnesia:wread({?TRIE_NODE, NodeId}) of
|
||||
ok = mnesia:delete({?TRIE_TAB, #trie_edge{node_id = NodeId, word = Word}}),
|
||||
case mnesia:wread({?TRIE_NODE_TAB, NodeId}) of
|
||||
[#trie_node{edge_count = 1, topic = undefined}] ->
|
||||
ok = mnesia:delete({?TRIE_NODE, NodeId}),
|
||||
ok = mnesia:delete({?TRIE_NODE_TAB, NodeId}),
|
||||
delete_path(RestPath);
|
||||
[TrieNode = #trie_node{edge_count = 1, topic = _}] ->
|
||||
write_trie_node(TrieNode#trie_node{edge_count = 0});
|
||||
|
@ -182,9 +182,9 @@ delete_path([{NodeId, Word, _} | RestPath]) ->
|
|||
|
||||
%% @private
|
||||
write_trie(Trie) ->
|
||||
mnesia:write(?TRIE, Trie, write).
|
||||
mnesia:write(?TRIE_TAB, Trie, write).
|
||||
|
||||
%% @private
|
||||
write_trie_node(TrieNode) ->
|
||||
mnesia:write(?TRIE_NODE, TrieNode, write).
|
||||
mnesia:write(?TRIE_NODE_TAB, TrieNode, write).
|
||||
|
||||
|
|
Loading…
Reference in New Issue