Add clean_acl_cache API

This commit is contained in:
turtled 2017-08-15 12:18:31 +08:00
parent 37ba47365c
commit 303db5ccbd
5 changed files with 47 additions and 10 deletions

View File

@ -34,7 +34,7 @@
-export([start_link/2]). -export([start_link/2]).
%% Management and Monitor API %% Management and Monitor API
-export([info/1, stats/1, kick/1]). -export([info/1, stats/1, kick/1, clean_acl_cache/2]).
-export([set_rate_limit/2, get_rate_limit/1]). -export([set_rate_limit/2, get_rate_limit/1]).
@ -92,6 +92,9 @@ unsubscribe(CPid, Topics) ->
session(CPid) -> session(CPid) ->
gen_server2:call(CPid, session, infinity). gen_server2:call(CPid, session, infinity).
clean_acl_cache(CPid, Topic) ->
gen_server2:call(CPid, {clean_acl_cache, Topic}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -175,6 +178,10 @@ handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) ->
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
reply(emqttd_protocol:session(ProtoState), State); reply(emqttd_protocol:session(ProtoState), State);
handle_call({clean_acl_cache, Topic}, _From, State) ->
erase({acl, publish, Topic}),
reply(ok, State);
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State). ?UNEXPECTED_REQ(Req, State).

View File

@ -71,6 +71,11 @@ run(["help"]) -> usage(), ok;
run(["set" | _] = CmdS) -> run(["set" | _] = CmdS) ->
emqttd_cli_config:run(["config" | CmdS]), ok; emqttd_cli_config:run(["config" | CmdS]), ok;
run(["showall"] = CmdS) ->
Cfgs = ets:tab2list(clique_config),
?USAGE(),
ok;
run(["show" | _] = CmdS) -> run(["show" | _] = CmdS) ->
emqttd_cli_config:run(["config" | CmdS]), ok; emqttd_cli_config:run(["config" | CmdS]), ok;

View File

@ -41,7 +41,7 @@
-export([publish/1, subscribe/1, unsubscribe/1]). -export([publish/1, subscribe/1, unsubscribe/1]).
-export([kick_client/1]). -export([kick_client/1, clean_acl_cache/2]).
-define(KB, 1024). -define(KB, 1024).
-define(MB, (1024*1024)). -define(MB, (1024*1024)).
@ -289,10 +289,7 @@ unsubscribe({ClientId, Topic})->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
kick_client(ClientId) -> kick_client(ClientId) ->
Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Result) of lists:any(fun(Item) -> Item =:= ok end, Result).
true -> {ok, [{status, success}]};
false -> {ok, [{status, failure}]}
end.
kick_client(Node, ClientId) when Node =:= node() -> kick_client(Node, ClientId) when Node =:= node() ->
case emqttd_cm:lookup(ClientId) of case emqttd_cm:lookup(ClientId) of
@ -302,6 +299,19 @@ kick_client(Node, ClientId) when Node =:= node() ->
kick_client(Node, ClientId) -> kick_client(Node, ClientId) ->
rpc_call(Node, kick_client, [Node, ClientId]). rpc_call(Node, kick_client, [Node, ClientId]).
clean_acl_cache(ClientId, Topic) ->
Result = [clean_acl_cache(Node, ClientId, Topic) || Node <- ekka_mnesia:running_nodes()],
lists:any(fun(Item) -> Item =:= ok end, Result).
clean_acl_cache(Node, ClientId, Topic) when Node =:= node() ->
case emqttd_cm:lookup(ClientId) of
undefined -> error;
#mqtt_client{client_pid = Pid}-> emqttd_client:clean_acl_cache(Pid, Topic)
end;
clean_acl_cache(Node, ClientId, Topic) ->
rpc_call(Node, clean_acl_cache, [Node, ClientId, Topic]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internel Functions. %% Internel Functions.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -25,6 +25,7 @@
-http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}). -http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}).
-http_api({"^clients/(.+?)/?$", 'GET', client, []}). -http_api({"^clients/(.+?)/?$", 'GET', client, []}).
-http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}). -http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}).
-http_api({"^clean_acl_cache/(.+?)/?$", 'PUT', clean_acl_cache, [{<<"topic">>, binary}]}).
-http_api({"^routes?$", 'GET', route_list, []}). -http_api({"^routes?$", 'GET', route_list, []}).
-http_api({"^routes/(.+?)/?$", 'GET', route, []}). -http_api({"^routes/(.+?)/?$", 'GET', route, []}).
@ -56,7 +57,7 @@
-http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}). -http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}).
-export([alarm_list/3]). -export([alarm_list/3]).
-export([client/3, client_list/3, client_list/4, kick_client/3]). -export([client/3, client_list/3, client_list/4, kick_client/3, clean_acl_cache/3]).
-export([route/3, route_list/2]). -export([route/3, route_list/2]).
-export([session/3, session_list/3, session_list/4]). -export([session/3, session_list/3, session_list/4]).
-export([subscription/3, subscription_list/3, subscription_list/4]). -export([subscription/3, subscription_list/3, subscription_list/4]).
@ -108,8 +109,15 @@ client_list('GET', Params, Node, Key) ->
kick_client('PUT', _Params, Key) -> kick_client('PUT', _Params, Key) ->
case emqttd_mgmt:kick_client(l2b(Key)) of case emqttd_mgmt:kick_client(l2b(Key)) of
ok -> {ok, []}; true -> {ok, []};
error -> {error, [{code, ?ERROR12}]} false -> {error, [{code, ?ERROR12}]}
end.
clean_acl_cache('PUT', Params, Key) ->
Topic = proplists:get_value(<<"topic">>, Params),
case emqttd_mgmt:clean_acl_cache(l2b(Key), Topic) of
true -> {ok, []};
false -> {error, [{code, ?ERROR12}]}
end. end.
client_row(#mqtt_client{client_id = ClientId, client_row(#mqtt_client{client_id = ClientId,

View File

@ -34,7 +34,7 @@
-export([start_link/4]). -export([start_link/4]).
%% Management and Monitor API %% Management and Monitor API
-export([info/1, stats/1, kick/1]). -export([info/1, stats/1, kick/1, clean_acl_cache/2]).
%% SUB/UNSUB Asynchronously %% SUB/UNSUB Asynchronously
-export([subscribe/2, unsubscribe/2]). -export([subscribe/2, unsubscribe/2]).
@ -82,6 +82,9 @@ unsubscribe(CPid, Topics) ->
session(CPid) -> session(CPid) ->
gen_server2:call(CPid, session). gen_server2:call(CPid, session).
clean_acl_cache(CPid, Topic) ->
gen_server2:call(CPid, {clean_acl_cache, Topic}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -133,6 +136,10 @@ handle_call(kick, _From, State) ->
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
reply(emqttd_protocol:session(ProtoState), State); reply(emqttd_protocol:session(ProtoState), State);
handle_call({clean_acl_cache, Topic}, _From, State) ->
erase({acl, publish, Topic}),
reply(ok, State);
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?WSLOG(error, "Unexpected request: ~p", [Req], State), ?WSLOG(error, "Unexpected request: ~p", [Req], State),
reply({error, unexpected_request}, State). reply({error, unexpected_request}, State).