diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c7167c8a7..6631b4566 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -34,7 +34,7 @@ -export([start_link/2]). %% Management and Monitor API --export([info/1, stats/1, kick/1]). +-export([info/1, stats/1, kick/1, clean_acl_cache/2]). -export([set_rate_limit/2, get_rate_limit/1]). @@ -92,6 +92,9 @@ unsubscribe(CPid, Topics) -> session(CPid) -> gen_server2:call(CPid, session, infinity). +clean_acl_cache(CPid, Topic) -> + gen_server2:call(CPid, {clean_acl_cache, Topic}). + %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- @@ -175,6 +178,10 @@ handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) -> handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> reply(emqttd_protocol:session(ProtoState), State); +handle_call({clean_acl_cache, Topic}, _From, State) -> + erase({acl, publish, Topic}), + reply(ok, State); + handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 0cec222fd..c6856cc4c 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -71,6 +71,11 @@ run(["help"]) -> usage(), ok; run(["set" | _] = CmdS) -> emqttd_cli_config:run(["config" | CmdS]), ok; +run(["showall"] = CmdS) -> + Cfgs = ets:tab2list(clique_config), + ?USAGE(), + ok; + run(["show" | _] = CmdS) -> emqttd_cli_config:run(["config" | CmdS]), ok; diff --git a/src/emqttd_mgmt.erl b/src/emqttd_mgmt.erl index a46f70d09..0561b86a5 100644 --- a/src/emqttd_mgmt.erl +++ b/src/emqttd_mgmt.erl @@ -41,7 +41,7 @@ -export([publish/1, subscribe/1, unsubscribe/1]). --export([kick_client/1]). +-export([kick_client/1, clean_acl_cache/2]). -define(KB, 1024). -define(MB, (1024*1024)). @@ -289,10 +289,7 @@ unsubscribe({ClientId, Topic})-> %%-------------------------------------------------------------------- kick_client(ClientId) -> Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Result) of - true -> {ok, [{status, success}]}; - false -> {ok, [{status, failure}]} - end. + lists:any(fun(Item) -> Item =:= ok end, Result). kick_client(Node, ClientId) when Node =:= node() -> case emqttd_cm:lookup(ClientId) of @@ -302,6 +299,19 @@ kick_client(Node, ClientId) when Node =:= node() -> kick_client(Node, ClientId) -> rpc_call(Node, kick_client, [Node, ClientId]). + +clean_acl_cache(ClientId, Topic) -> + Result = [clean_acl_cache(Node, ClientId, Topic) || Node <- ekka_mnesia:running_nodes()], + lists:any(fun(Item) -> Item =:= ok end, Result). + +clean_acl_cache(Node, ClientId, Topic) when Node =:= node() -> + case emqttd_cm:lookup(ClientId) of + undefined -> error; + #mqtt_client{client_pid = Pid}-> emqttd_client:clean_acl_cache(Pid, Topic) + end; +clean_acl_cache(Node, ClientId, Topic) -> + rpc_call(Node, clean_acl_cache, [Node, ClientId, Topic]). + %%-------------------------------------------------------------------- %% Internel Functions. %%-------------------------------------------------------------------- diff --git a/src/emqttd_rest_api.erl b/src/emqttd_rest_api.erl index 815081a4f..9129c2c14 100644 --- a/src/emqttd_rest_api.erl +++ b/src/emqttd_rest_api.erl @@ -25,6 +25,7 @@ -http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}). -http_api({"^clients/(.+?)/?$", 'GET', client, []}). -http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}). +-http_api({"^clean_acl_cache/(.+?)/?$", 'PUT', clean_acl_cache, [{<<"topic">>, binary}]}). -http_api({"^routes?$", 'GET', route_list, []}). -http_api({"^routes/(.+?)/?$", 'GET', route, []}). @@ -56,7 +57,7 @@ -http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}). -export([alarm_list/3]). --export([client/3, client_list/3, client_list/4, kick_client/3]). +-export([client/3, client_list/3, client_list/4, kick_client/3, clean_acl_cache/3]). -export([route/3, route_list/2]). -export([session/3, session_list/3, session_list/4]). -export([subscription/3, subscription_list/3, subscription_list/4]). @@ -108,8 +109,15 @@ client_list('GET', Params, Node, Key) -> kick_client('PUT', _Params, Key) -> case emqttd_mgmt:kick_client(l2b(Key)) of - ok -> {ok, []}; - error -> {error, [{code, ?ERROR12}]} + true -> {ok, []}; + false -> {error, [{code, ?ERROR12}]} + end. + +clean_acl_cache('PUT', Params, Key) -> + Topic = proplists:get_value(<<"topic">>, Params), + case emqttd_mgmt:clean_acl_cache(l2b(Key), Topic) of + true -> {ok, []}; + false -> {error, [{code, ?ERROR12}]} end. client_row(#mqtt_client{client_id = ClientId, diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 2433b2ea8..a583611a4 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -34,7 +34,7 @@ -export([start_link/4]). %% Management and Monitor API --export([info/1, stats/1, kick/1]). +-export([info/1, stats/1, kick/1, clean_acl_cache/2]). %% SUB/UNSUB Asynchronously -export([subscribe/2, unsubscribe/2]). @@ -82,6 +82,9 @@ unsubscribe(CPid, Topics) -> session(CPid) -> gen_server2:call(CPid, session). +clean_acl_cache(CPid, Topic) -> + gen_server2:call(CPid, {clean_acl_cache, Topic}). + %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- @@ -133,6 +136,10 @@ handle_call(kick, _From, State) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> reply(emqttd_protocol:session(ProtoState), State); +handle_call({clean_acl_cache, Topic}, _From, State) -> + erase({acl, publish, Topic}), + reply(ok, State); + handle_call(Req, _From, State) -> ?WSLOG(error, "Unexpected request: ~p", [Req], State), reply({error, unexpected_request}, State).