fix(gw): fix removing listener error

This commit is contained in:
JianBo He 2021-09-29 14:44:37 +08:00
parent 8e4256c06f
commit 642b588ad0
5 changed files with 109 additions and 46 deletions

View File

@ -66,6 +66,7 @@ lookup(Name) ->
emqx_gateway_sup:lookup_gateway(Name). emqx_gateway_sup:lookup_gateway(Name).
-spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}. -spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}.
%% @doc This function only supports full configuration updates
update(Name, Config) -> update(Name, Config) ->
emqx_gateway_sup:update_gateway(Name, Config). emqx_gateway_sup:update_gateway(Name, Config).

View File

@ -84,8 +84,12 @@ gateway(post, Request) ->
gateway_insta(delete, #{bindings := #{name := Name0}}) -> gateway_insta(delete, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway:unload(GwName), case emqx_gateway_conf:unload_gateway(GwName) of
{204} ok ->
{204};
{error, Reason} ->
return_http_error(400, Reason)
end
end); end);
gateway_insta(get, #{bindings := #{name := Name0}}) -> gateway_insta(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(_, _) -> with_gateway(Name0, fun(_, _) ->

View File

@ -53,50 +53,77 @@ gateway(["list"]) ->
lists:foreach(fun(#{name := Name} = Gateway) -> lists:foreach(fun(#{name := Name} = Gateway) ->
%% TODO: More infos: listeners?, connected? %% TODO: More infos: listeners?, connected?
Status = maps:get(status, Gateway, stopped), Status = maps:get(status, Gateway, stopped),
emqx_ctl:print("Gateway(name=~s, status=~s)~n", print("Gateway(name=~s, status=~s)~n", [Name, Status])
[Name, Status])
end, emqx_gateway:list()); end, emqx_gateway:list());
gateway(["lookup", Name]) -> gateway(["lookup", Name]) ->
case emqx_gateway:lookup(atom(Name)) of case emqx_gateway:lookup(atom(Name)) of
undefined -> undefined ->
emqx_ctl:print("undefined~n"); print("undefined~n");
Info -> Info ->
emqx_ctl:print("~p~n", [Info]) print("~p~n", [Info])
end;
gateway(["load", Name, Conf]) ->
case emqx_gateway_conf:load_gateway(
bin(Name),
emqx_json:decode(Conf, [return_maps])
) of
ok ->
print("ok~n");
{error, Reason} ->
print("Error: ~p~n", [Reason])
end;
gateway(["unload", Name]) ->
case emqx_gateway_conf:unload_gateway(bin(Name)) of
ok ->
print("ok~n");
{error, Reason} ->
print("Error: ~p~n", [Reason])
end; end;
gateway(["stop", Name]) -> gateway(["stop", Name]) ->
case emqx_gateway:stop(atom(Name)) of case emqx_gateway_conf:update_gateway(
bin(Name),
#{<<"enable">> => <<"false">>}
) of
ok -> ok ->
emqx_ctl:print("ok~n"); print("ok~n");
{error, Reason} -> {error, Reason} ->
emqx_ctl:print("Error: ~p~n", [Reason]) print("Error: ~p~n", [Reason])
end; end;
gateway(["start", Name]) -> gateway(["start", Name]) ->
case emqx_gateway:start(atom(Name)) of case emqx_gateway_conf:update_gateway(
bin(Name),
#{<<"enable">> => <<"true">>}
) of
ok -> ok ->
emqx_ctl:print("ok~n"); print("ok~n");
{error, Reason} -> {error, Reason} ->
emqx_ctl:print("Error: ~p~n", [Reason]) print("Error: ~p~n", [Reason])
end; end;
gateway(_) -> gateway(_) ->
%% TODO: create/remove APIs
emqx_ctl:usage([ {"gateway list", emqx_ctl:usage([ {"gateway list",
"List all gateway"} "List all gateway"}
, {"gateway lookup <Name>", , {"gateway lookup <Name>",
"Lookup a gateway detailed informations"} "Lookup a gateway detailed informations"}
, {"gateway load <Name> <JsonConf>",
"Load a gateway with config"}
, {"gateway unload <Name>",
"Unload the gateway"}
, {"gateway stop <Name>", , {"gateway stop <Name>",
"Stop a gateway instance"} "Stop the gateway"}
, {"gateway start <Name>", , {"gateway start <Name>",
"Start a gateway instance"} "Start the gateway"}
]). ]).
'gateway-registry'(["list"]) -> 'gateway-registry'(["list"]) ->
lists:foreach( lists:foreach(
fun({Name, #{cbkmod := CbMod}}) -> fun({Name, #{cbkmod := CbMod}}) ->
emqx_ctl:print("Registered Name: ~s, Callback Module: ~s~n", [Name, CbMod]) print("Registered Name: ~s, Callback Module: ~s~n", [Name, CbMod])
end, end,
emqx_gateway_registry:list()); emqx_gateway_registry:list());
@ -106,11 +133,11 @@ gateway(_) ->
]). ]).
'gateway-clients'(["list", Name]) -> 'gateway-clients'(["list", Name]) ->
%% FIXME: page me. for example: --limit 100 --page 10 ??? %% FIXME: page me?
InfoTab = emqx_gateway_cm:tabname(info, Name), InfoTab = emqx_gateway_cm:tabname(info, Name),
case ets:info(InfoTab) of case ets:info(InfoTab) of
undefined -> undefined ->
emqx_ctl:print("Bad Gateway Name.~n"); print("Bad Gateway Name.~n");
_ -> _ ->
dump(InfoTab, client) dump(InfoTab, client)
end; end;
@ -118,7 +145,7 @@ gateway(_) ->
'gateway-clients'(["lookup", Name, ClientId]) -> 'gateway-clients'(["lookup", Name, ClientId]) ->
ChanTab = emqx_gateway_cm:tabname(chan, Name), ChanTab = emqx_gateway_cm:tabname(chan, Name),
case ets:lookup(ChanTab, bin(ClientId)) of case ets:lookup(ChanTab, bin(ClientId)) of
[] -> emqx_ctl:print("Not Found.~n"); [] -> print("Not Found.~n");
[Chann] -> [Chann] ->
InfoTab = emqx_gateway_cm:tabname(info, Name), InfoTab = emqx_gateway_cm:tabname(info, Name),
[ChannInfo] = ets:lookup(InfoTab, Chann), [ChannInfo] = ets:lookup(InfoTab, Chann),
@ -127,8 +154,8 @@ gateway(_) ->
'gateway-clients'(["kick", Name, ClientId]) -> 'gateway-clients'(["kick", Name, ClientId]) ->
case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of
ok -> emqx_ctl:print("ok~n"); ok -> print("ok~n");
_ -> emqx_ctl:print("Not Found.~n") _ -> print("Not Found.~n")
end; end;
'gateway-clients'(_) -> 'gateway-clients'(_) ->
@ -144,11 +171,11 @@ gateway(_) ->
Tab = emqx_gateway_metrics:tabname(Name), Tab = emqx_gateway_metrics:tabname(Name),
case ets:info(Tab) of case ets:info(Tab) of
undefined -> undefined ->
emqx_ctl:print("Bad Gateway Name.~n"); print("Bad Gateway Name.~n");
_ -> _ ->
lists:foreach( lists:foreach(
fun({K, V}) -> fun({K, V}) ->
emqx_ctl:print("~-30s: ~w~n", [K, V]) print("~-30s: ~w~n", [K, V])
end, lists:sort(ets:tab2list(Tab))) end, lists:sort(ets:tab2list(Tab)))
end; end;
@ -177,10 +204,10 @@ dump(_Table, _, '$end_of_table', Result) ->
lists:reverse(Result); lists:reverse(Result);
dump(Table, Tag, Key, Result) -> dump(Table, Tag, Key, Result) ->
PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)], PrintValue = [print_record({Tag, Record}) || Record <- ets:lookup(Table, Key)],
dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]). dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
print({client, {_, Infos, Stats}}) -> print_record({client, {_, Infos, Stats}}) ->
ClientInfo = maps:get(clientinfo, Infos, #{}), ClientInfo = maps:get(clientinfo, Infos, #{}),
ConnInfo = maps:get(conninfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}),
_Session = maps:get(session, Infos, #{}), _Session = maps:get(session, Infos, #{}),
@ -202,12 +229,15 @@ print({client, {_, Infos, Stats}}) ->
connected_at => ConnectedAt connected_at => ConnectedAt
}, },
emqx_ctl:print("Client(~s, username=~s, peername=~s, " print("Client(~s, username=~s, peername=~s, "
"clean_start=~s, keepalive=~w, " "clean_start=~s, keepalive=~w, "
"subscriptions=~w, delivered_msgs=~w, " "subscriptions=~w, delivered_msgs=~w, "
"connected=~s, created_at=~w, connected_at=~w)~n", "connected=~s, created_at=~w, connected_at=~w)~n",
[format(K, maps:get(K, Info)) || K <- InfoKeys]). [format(K, maps:get(K, Info)) || K <- InfoKeys]).
print(S) -> emqx_ctl:print(S).
print(S, A) -> emqx_ctl:print(S, A).
format(_, undefined) -> format(_, undefined) ->
undefined; undefined;

View File

@ -25,7 +25,7 @@
%% APIs %% APIs
-export([ load_gateway/2 -export([ load_gateway/2
, update_gateway/2 , update_gateway/2
, remove_gateway/1 , unload_gateway/1
, add_listener/3 , add_listener/3
, update_listener/3 , update_listener/3
, remove_listener/2 , remove_listener/2
@ -64,14 +64,43 @@ unload() ->
-spec load_gateway(atom_or_bin(), map()) -> ok_or_err(). -spec load_gateway(atom_or_bin(), map()) -> ok_or_err().
load_gateway(GwName, Conf) -> load_gateway(GwName, Conf) ->
update({?FUNCTION_NAME, bin(GwName), Conf}). NConf = case maps:take(<<"listeners">>, Conf) of
error -> Conf;
{Ls, Conf1} ->
Conf1#{<<"listeners">> => mapping(Ls)}
end,
update({?FUNCTION_NAME, bin(GwName), NConf}).
mapping(Ls) when is_list(Ls) ->
convert_to_map(Ls);
mapping(Ls) when is_map(Ls) ->
Ls.
convert_to_map(Listeners) when is_list(Listeners) ->
lists:foldl(fun(Lis, Acc) ->
{[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis),
emqx_map_lib:deep_merge(Acc, #{Type => #{Name => Lis1}})
end, #{}, Listeners).
maps_key_take(Ks, M) ->
maps_key_take(Ks, M, []).
maps_key_take([], M, Acc) ->
{lists:reverse(Acc), M};
maps_key_take([K|Ks], M, Acc) ->
case maps:take(K, M) of
error -> throw(bad_key);
{V, M1} ->
maps_key_take(Ks, M1, [V|Acc])
end.
-spec update_gateway(atom_or_bin(), map()) -> ok_or_err(). -spec update_gateway(atom_or_bin(), map()) -> ok_or_err().
update_gateway(GwName, Conf) -> update_gateway(GwName, Conf0) ->
Conf = maps:without([listeners, authentication,
<<"listeners">>, <<"authentication">>], Conf0),
update({?FUNCTION_NAME, bin(GwName), Conf}). update({?FUNCTION_NAME, bin(GwName), Conf}).
-spec remove_gateway(atom_or_bin()) -> ok_or_err(). -spec unload_gateway(atom_or_bin()) -> ok_or_err().
remove_gateway(GwName) -> unload_gateway(GwName) ->
update({?FUNCTION_NAME, bin(GwName)}). update({?FUNCTION_NAME, bin(GwName)}).
-spec add_listener(atom_or_bin(), listener_ref(), map()) -> ok_or_err(). -spec add_listener(atom_or_bin(), listener_ref(), map()) -> ok_or_err().
@ -148,7 +177,7 @@ pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
<<"authentication">>], Conf), <<"authentication">>], Conf),
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})} {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
end; end;
pre_config_update({remove_gateway, GwName}, RawConf) -> pre_config_update({unload_gateway, GwName}, RawConf) ->
{ok, maps:remove(GwName, RawConf)}; {ok, maps:remove(GwName, RawConf)};
pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) -> pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->

View File

@ -298,17 +298,15 @@ do_deinit_authn(Names) ->
end end
end, Names). end, Names).
do_update_one_by_one(NCfg0, State = #state{ do_update_one_by_one(NCfg, State = #state{
name = GwName,
config = OCfg, config = OCfg,
status = Status}) -> status = Status}) ->
NCfg = emqx_map_lib:deep_merge(OCfg, NCfg0),
OEnable = maps:get(enable, OCfg, true), OEnable = maps:get(enable, OCfg, true),
NEnable = maps:get(enable, NCfg0, OEnable), NEnable = maps:get(enable, NCfg, OEnable),
OAuth = maps:get(authentication, OCfg, undefined), OAuths = authns(GwName, OCfg),
NAuth = maps:get(authentication, NCfg0, OAuth), NAuths = authns(GwName, NCfg),
if if
Status == stopped, NEnable == true -> Status == stopped, NEnable == true ->
@ -317,7 +315,7 @@ do_update_one_by_one(NCfg0, State = #state{
Status == stopped, NEnable == false -> Status == stopped, NEnable == false ->
{ok, State#state{config = NCfg}}; {ok, State#state{config = NCfg}};
Status == running, NEnable == true -> Status == running, NEnable == true ->
NState = case NAuth == OAuth of NState = case NAuths == OAuths of
true -> State; true -> State;
false -> false ->
%% Reset Authentication first %% Reset Authentication first
@ -325,6 +323,7 @@ do_update_one_by_one(NCfg0, State = #state{
AuthnNames = init_authn(State#state.name, NCfg), AuthnNames = init_authn(State#state.name, NCfg),
State#state{authns = AuthnNames} State#state{authns = AuthnNames}
end, end,
%% XXX: minimum impact update ???
cb_gateway_update(NCfg, NState); cb_gateway_update(NCfg, NState);
Status == running, NEnable == false -> Status == running, NEnable == false ->
case cb_gateway_unload(State) of case cb_gateway_unload(State) of