diff --git a/rebar.config b/rebar.config index c28a43276..15b7b5a0e 100644 --- a/rebar.config +++ b/rebar.config @@ -31,7 +31,7 @@ [{deps, [{meck, "0.8.13"}, % hex {bbmustache, "1.7.0"}, % hex - {emqx_ct_helpers, "1.1.3"} % hex + {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "develop"}}} ]} ]} ]}. diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index c38552bd7..ba3743385 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -32,9 +32,10 @@ -export([start_link/0]). --export([ add/1 +-export([ check/1 + , add/1 , delete/1 - , check/1 + , info/1 ]). %% gen_server callbacks @@ -81,7 +82,11 @@ add(Banned) when is_record(Banned, banned) -> -spec(delete({client_id, emqx_types:client_id()} | {username, emqx_types:username()} | {peername, emqx_types:peername()}) -> ok). -delete(Key) -> mnesia:dirty_delete(?BANNED_TAB, Key). +delete(Key) -> + mnesia:dirty_delete(?BANNED_TAB, Key). + +info(InfoKey) -> + mnesia:table_info(?BANNED_TAB, InfoKey). %%-------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index 1bf70f4c8..a51abdc68 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -16,8 +16,11 @@ -module(emqx_inflight). +-compile(inline). + %% APIs --export([ new/1 +-export([ new/0 + , new/1 , contain/2 , lookup/2 , insert/3 @@ -45,9 +48,8 @@ -define(Inflight(MaxSize, Tree), {inflight, MaxSize, (Tree)}). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- +-spec(new() -> inflight()). +new() -> new(0). -spec(new(non_neg_integer()) -> inflight()). new(MaxSize) when MaxSize >= 0 -> diff --git a/src/emqx_json.erl b/src/emqx_json.erl index c9130646f..665e7ad57 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -16,6 +16,8 @@ -module(emqx_json). +-compile(inline). + -export([ encode/1 , encode/2 , safe_encode/1 @@ -32,7 +34,8 @@ encode(Term) -> jsx:encode(Term). --spec(encode(jsx:json_term(), jsx_to_json:config()) -> jsx:json_text()). +-spec(encode(jsx:json_term(), jsx_to_json:config()) + -> jsx:json_text()). encode(Term, Opts) -> jsx:encode(Term, Opts). @@ -55,7 +58,8 @@ safe_encode(Term, Opts) -> decode(Json) -> jsx:decode(Json). --spec(decode(jsx:json_text(), jsx_to_json:config()) -> jsx:json_term()). +-spec(decode(jsx:json_text(), jsx_to_json:config()) + -> jsx:json_term()). decode(Json, Opts) -> jsx:decode(Json, Opts). diff --git a/src/emqx_mountpoint.erl b/src/emqx_mountpoint.erl index 4b820fdc7..986bbd048 100644 --- a/src/emqx_mountpoint.erl +++ b/src/emqx_mountpoint.erl @@ -17,7 +17,7 @@ -module(emqx_mountpoint). -include("emqx.hrl"). --include("logger.hrl"). +-include("types.hrl"). -export([ mount/2 , unmount/2 @@ -29,41 +29,46 @@ -type(mountpoint() :: binary()). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - +-spec(mount(maybe(mountpoint()), Any) -> Any + when Any :: emqx_types:topic() + | emqx_types:message() + | emqx_types:topic_filters()). mount(undefined, Any) -> Any; mount(MountPoint, Topic) when is_binary(Topic) -> - <>; + prefix(MountPoint, Topic); mount(MountPoint, Msg = #message{topic = Topic}) -> - Msg#message{topic = <>}; + Msg#message{topic = prefix(MountPoint, Topic)}; mount(MountPoint, TopicFilters) when is_list(TopicFilters) -> - [{<>, SubOpts} - || {Topic, SubOpts} <- TopicFilters]. + [{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters]. -unmount(undefined, Msg) -> - Msg; -%% TODO: Fixme later +%% @private +-compile({inline, [prefix/2]}). +prefix(MountPoint, Topic) -> + <>. + +-spec(unmount(maybe(mountpoint()), Any) -> Any + when Any :: emqx_types:topic() + | emqx_types:message()). +unmount(undefined, Any) -> + Any; unmount(MountPoint, Topic) when is_binary(Topic) -> - try split_binary(Topic, byte_size(MountPoint)) of - {MountPoint, Topic1} -> Topic1 - catch - error:badarg-> Topic + case string:prefix(Topic, MountPoint) of + nomatch -> Topic; + Topic1 -> Topic1 end; unmount(MountPoint, Msg = #message{topic = Topic}) -> - try split_binary(Topic, byte_size(MountPoint)) of - {MountPoint, Topic1} -> Msg#message{topic = Topic1} - catch - error:badarg-> - Msg + case string:prefix(Topic, MountPoint) of + nomatch -> Msg; + Topic1 -> Msg#message{topic = Topic1} end. +-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())). replvar(undefined, _Vars) -> undefined; replvar(MountPoint, #{client_id := ClientId, username := Username}) -> - lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). + lists:foldl(fun feed_var/2, MountPoint, + [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). feed_var({<<"%c">>, ClientId}, MountPoint) -> emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 327b96018..784552a2d 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -163,5 +163,7 @@ connack_error(banned) -> ?RC_BANNED; connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD; connack_error(_) -> ?RC_NOT_AUTHORIZED. +%%TODO: This function should be removed. puback([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback(L) when is_list(L) -> ?RC_SUCCESS. + diff --git a/src/emqx_router.erl b/src/emqx_router.erl index f1c96b190..d0e5bf188 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -66,16 +66,16 @@ -type(group() :: binary()). --type(destination() :: node() | {group(), node()}). +-type(dest() :: node() | {group(), node()}). --define(ROUTE, emqx_route). +-define(ROUTE_TAB, emqx_route). %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?ROUTE, [ + ok = ekka_mnesia:create_table(?ROUTE_TAB, [ {type, bag}, {ram_copies, [node()]}, {record_name, route}, @@ -83,7 +83,7 @@ mnesia(boot) -> {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]); mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ROUTE). + ok = ekka_mnesia:copy_table(?ROUTE_TAB). %%-------------------------------------------------------------------- %% Start a router @@ -102,7 +102,7 @@ start_link(Pool, Id) -> add_route(Topic) when is_binary(Topic) -> add_route(Topic, node()). --spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +-spec(add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). add_route(Topic, Dest) when is_binary(Topic) -> call(pick(Topic), {add_route, Topic, Dest}). @@ -110,7 +110,7 @@ add_route(Topic, Dest) when is_binary(Topic) -> do_add_route(Topic) when is_binary(Topic) -> do_add_route(Topic, node()). --spec(do_add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). do_add_route(Topic, Dest) when is_binary(Topic) -> Route = #route{topic = Topic, dest = Dest}, case lists:member(Route, lookup_routes(Topic)) of @@ -142,17 +142,17 @@ match_trie(Topic) -> -spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]). lookup_routes(Topic) -> - ets:lookup(?ROUTE, Topic). + ets:lookup(?ROUTE_TAB, Topic). -spec(has_routes(emqx_topic:topic()) -> boolean()). has_routes(Topic) when is_binary(Topic) -> - ets:member(?ROUTE, Topic). + ets:member(?ROUTE_TAB, Topic). -spec(delete_route(emqx_topic:topic()) -> ok | {error, term()}). delete_route(Topic) when is_binary(Topic) -> delete_route(Topic, node()). --spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +-spec(delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). delete_route(Topic, Dest) when is_binary(Topic) -> call(pick(Topic), {delete_route, Topic, Dest}). @@ -160,7 +160,7 @@ delete_route(Topic, Dest) when is_binary(Topic) -> do_delete_route(Topic) when is_binary(Topic) -> do_delete_route(Topic, node()). --spec(do_delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of @@ -170,7 +170,7 @@ do_delete_route(Topic, Dest) -> -spec(topics() -> list(emqx_topic:topic())). topics() -> - mnesia:dirty_all_keys(?ROUTE). + mnesia:dirty_all_keys(?ROUTE_TAB). %% @doc Print routes to a topic -spec(print_routes(emqx_topic:topic()) -> ok). @@ -224,25 +224,25 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- insert_direct_route(Route) -> - mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]). + mnesia:async_dirty(fun mnesia:write/3, [?ROUTE_TAB, Route, sticky_write]). insert_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE, Topic}) of + case mnesia:wread({?ROUTE_TAB, Topic}) of [] -> emqx_trie:insert(Topic); _ -> ok end, - mnesia:write(?ROUTE, Route, sticky_write). + mnesia:write(?ROUTE_TAB, Route, sticky_write). delete_direct_route(Route) -> - mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]). + mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE_TAB, Route, sticky_write]). delete_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE, Topic}) of + case mnesia:wread({?ROUTE_TAB, Topic}) of [Route] -> %% Remove route and trie - ok = mnesia:delete_object(?ROUTE, Route, sticky_write), + ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write), emqx_trie:delete(Topic); [_|_] -> %% Remove route only - mnesia:delete_object(?ROUTE, Route, sticky_write); + mnesia:delete_object(?ROUTE_TAB, Route, sticky_write); [] -> ok end. diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index d437fec9d..c0c037a7c 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -34,8 +34,8 @@ -export([empty/0]). %% Mnesia tables --define(TRIE, emqx_trie). --define(TRIE_NODE, emqx_trie_node). +-define(TRIE_TAB, emqx_trie). +-define(TRIE_NODE_TAB, emqx_trie_node). %%-------------------------------------------------------------------- %% Mnesia bootstrap @@ -48,13 +48,13 @@ mnesia(boot) -> StoreProps = [{ets, [{read_concurrency, true}, {write_concurrency, true}]}], %% Trie table - ok = ekka_mnesia:create_table(?TRIE, [ + ok = ekka_mnesia:create_table(?TRIE_TAB, [ {ram_copies, [node()]}, {record_name, trie}, {attributes, record_info(fields, trie)}, {storage_properties, StoreProps}]), %% Trie node table - ok = ekka_mnesia:create_table(?TRIE_NODE, [ + ok = ekka_mnesia:create_table(?TRIE_NODE_TAB, [ {ram_copies, [node()]}, {record_name, trie_node}, {attributes, record_info(fields, trie_node)}, @@ -62,9 +62,9 @@ mnesia(boot) -> mnesia(copy) -> %% Copy trie table - ok = ekka_mnesia:copy_table(?TRIE), + ok = ekka_mnesia:copy_table(?TRIE_TAB), %% Copy trie_node table - ok = ekka_mnesia:copy_table(?TRIE_NODE). + ok = ekka_mnesia:copy_table(?TRIE_NODE_TAB). %%-------------------------------------------------------------------- %% Trie APIs @@ -73,7 +73,7 @@ mnesia(copy) -> %% @doc Insert a topic filter into the trie. -spec(insert(emqx_topic:topic()) -> ok). insert(Topic) when is_binary(Topic) -> - case mnesia:wread({?TRIE_NODE, Topic}) of + case mnesia:wread({?TRIE_NODE_TAB, Topic}) of [#trie_node{topic = Topic}] -> ok; [TrieNode = #trie_node{topic = undefined}] -> @@ -94,14 +94,14 @@ match(Topic) when is_binary(Topic) -> %% @doc Lookup a trie node. -spec(lookup(NodeId :: binary()) -> [#trie_node{}]). lookup(NodeId) -> - mnesia:read(?TRIE_NODE, NodeId). + mnesia:read(?TRIE_NODE_TAB, NodeId). %% @doc Delete a topic filter from the trie. -spec(delete(emqx_topic:topic()) -> ok). delete(Topic) when is_binary(Topic) -> - case mnesia:wread({?TRIE_NODE, Topic}) of + case mnesia:wread({?TRIE_NODE_TAB, Topic}) of [#trie_node{edge_count = 0}] -> - ok = mnesia:delete({?TRIE_NODE, Topic}), + ok = mnesia:delete({?TRIE_NODE_TAB, Topic}), delete_path(lists:reverse(emqx_topic:triples(Topic))); [TrieNode] -> write_trie_node(TrieNode#trie_node{topic = undefined}); @@ -111,7 +111,7 @@ delete(Topic) when is_binary(Topic) -> %% @doc Is the trie empty? -spec(empty() -> boolean()). empty() -> - ets:info(?TRIE, size) == 0. + ets:info(?TRIE_TAB, size) == 0. %%-------------------------------------------------------------------- %% Internal functions @@ -121,9 +121,9 @@ empty() -> %% @doc Add a path to the trie. add_path({Node, Word, Child}) -> Edge = #trie_edge{node_id = Node, word = Word}, - case mnesia:wread({?TRIE_NODE, Node}) of + case mnesia:wread({?TRIE_NODE_TAB, Node}) of [TrieNode = #trie_node{edge_count = Count}] -> - case mnesia:wread({?TRIE, Edge}) of + case mnesia:wread({?TRIE_TAB, Edge}) of [] -> ok = write_trie_node(TrieNode#trie_node{edge_count = Count + 1}), write_trie(#trie{edge = Edge, node_id = Child}); @@ -143,11 +143,11 @@ match_node(NodeId, Words) -> match_node(NodeId, Words, []). match_node(NodeId, [], ResAcc) -> - mnesia:read(?TRIE_NODE, NodeId) ++ 'match_#'(NodeId, ResAcc); + mnesia:read(?TRIE_NODE_TAB, NodeId) ++ 'match_#'(NodeId, ResAcc); match_node(NodeId, [W|Words], ResAcc) -> lists:foldl(fun(WArg, Acc) -> - case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = WArg}) of + case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = WArg}) of [#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc); [] -> Acc end @@ -156,9 +156,9 @@ match_node(NodeId, [W|Words], ResAcc) -> %% @private %% @doc Match node with '#'. 'match_#'(NodeId, ResAcc) -> - case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = '#'}) of + case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = '#'}) of [#trie{node_id = ChildId}] -> - mnesia:read(?TRIE_NODE, ChildId) ++ ResAcc; + mnesia:read(?TRIE_NODE_TAB, ChildId) ++ ResAcc; [] -> ResAcc end. @@ -167,10 +167,10 @@ match_node(NodeId, [W|Words], ResAcc) -> delete_path([]) -> ok; delete_path([{NodeId, Word, _} | RestPath]) -> - ok = mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}), - case mnesia:wread({?TRIE_NODE, NodeId}) of + ok = mnesia:delete({?TRIE_TAB, #trie_edge{node_id = NodeId, word = Word}}), + case mnesia:wread({?TRIE_NODE_TAB, NodeId}) of [#trie_node{edge_count = 1, topic = undefined}] -> - ok = mnesia:delete({?TRIE_NODE, NodeId}), + ok = mnesia:delete({?TRIE_NODE_TAB, NodeId}), delete_path(RestPath); [TrieNode = #trie_node{edge_count = 1, topic = _}] -> write_trie_node(TrieNode#trie_node{edge_count = 0}); @@ -182,9 +182,9 @@ delete_path([{NodeId, Word, _} | RestPath]) -> %% @private write_trie(Trie) -> - mnesia:write(?TRIE, Trie, write). + mnesia:write(?TRIE_TAB, Trie, write). %% @private write_trie_node(TrieNode) -> - mnesia:write(?TRIE_NODE, TrieNode, write). + mnesia:write(?TRIE_NODE_TAB, TrieNode, write).