diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 23e9ce19c..96cc5d4ae 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -66,6 +66,7 @@ lookup(Name) -> emqx_gateway_sup:lookup_gateway(Name). -spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}. +%% @doc This function only supports full configuration updates update(Name, Config) -> emqx_gateway_sup:update_gateway(Name, Config). diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 78233e0b8..26b1a30ad 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -84,8 +84,12 @@ gateway(post, Request) -> gateway_insta(delete, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> - _ = emqx_gateway:unload(GwName), - {204} + case emqx_gateway_conf:unload_gateway(GwName) of + ok -> + {204}; + {error, Reason} -> + return_http_error(400, Reason) + end end); gateway_insta(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(_, _) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index 6ccb444f0..27c568fee 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -53,50 +53,77 @@ gateway(["list"]) -> lists:foreach(fun(#{name := Name} = Gateway) -> %% TODO: More infos: listeners?, connected? Status = maps:get(status, Gateway, stopped), - emqx_ctl:print("Gateway(name=~s, status=~s)~n", - [Name, Status]) + print("Gateway(name=~s, status=~s)~n", [Name, Status]) end, emqx_gateway:list()); gateway(["lookup", Name]) -> case emqx_gateway:lookup(atom(Name)) of undefined -> - emqx_ctl:print("undefined~n"); + print("undefined~n"); Info -> - emqx_ctl:print("~p~n", [Info]) + print("~p~n", [Info]) + end; + +gateway(["load", Name, Conf]) -> + case emqx_gateway_conf:load_gateway( + bin(Name), + emqx_json:decode(Conf, [return_maps]) + ) of + ok -> + print("ok~n"); + {error, Reason} -> + print("Error: ~p~n", [Reason]) + end; + +gateway(["unload", Name]) -> + case emqx_gateway_conf:unload_gateway(bin(Name)) of + ok -> + print("ok~n"); + {error, Reason} -> + print("Error: ~p~n", [Reason]) end; gateway(["stop", Name]) -> - case emqx_gateway:stop(atom(Name)) of + case emqx_gateway_conf:update_gateway( + bin(Name), + #{<<"enable">> => <<"false">>} + ) of ok -> - emqx_ctl:print("ok~n"); + print("ok~n"); {error, Reason} -> - emqx_ctl:print("Error: ~p~n", [Reason]) + print("Error: ~p~n", [Reason]) end; gateway(["start", Name]) -> - case emqx_gateway:start(atom(Name)) of + case emqx_gateway_conf:update_gateway( + bin(Name), + #{<<"enable">> => <<"true">>} + ) of ok -> - emqx_ctl:print("ok~n"); + print("ok~n"); {error, Reason} -> - emqx_ctl:print("Error: ~p~n", [Reason]) + print("Error: ~p~n", [Reason]) end; gateway(_) -> - %% TODO: create/remove APIs emqx_ctl:usage([ {"gateway list", "List all gateway"} , {"gateway lookup ", "Lookup a gateway detailed informations"} + , {"gateway load ", + "Load a gateway with config"} + , {"gateway unload ", + "Unload the gateway"} , {"gateway stop ", - "Stop a gateway instance"} + "Stop the gateway"} , {"gateway start ", - "Start a gateway instance"} + "Start the gateway"} ]). 'gateway-registry'(["list"]) -> lists:foreach( fun({Name, #{cbkmod := CbMod}}) -> - emqx_ctl:print("Registered Name: ~s, Callback Module: ~s~n", [Name, CbMod]) + print("Registered Name: ~s, Callback Module: ~s~n", [Name, CbMod]) end, emqx_gateway_registry:list()); @@ -106,11 +133,11 @@ gateway(_) -> ]). 'gateway-clients'(["list", Name]) -> - %% FIXME: page me. for example: --limit 100 --page 10 ??? + %% FIXME: page me? InfoTab = emqx_gateway_cm:tabname(info, Name), case ets:info(InfoTab) of undefined -> - emqx_ctl:print("Bad Gateway Name.~n"); + print("Bad Gateway Name.~n"); _ -> dump(InfoTab, client) end; @@ -118,7 +145,7 @@ gateway(_) -> 'gateway-clients'(["lookup", Name, ClientId]) -> ChanTab = emqx_gateway_cm:tabname(chan, Name), case ets:lookup(ChanTab, bin(ClientId)) of - [] -> emqx_ctl:print("Not Found.~n"); + [] -> print("Not Found.~n"); [Chann] -> InfoTab = emqx_gateway_cm:tabname(info, Name), [ChannInfo] = ets:lookup(InfoTab, Chann), @@ -127,8 +154,8 @@ gateway(_) -> 'gateway-clients'(["kick", Name, ClientId]) -> case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of - ok -> emqx_ctl:print("ok~n"); - _ -> emqx_ctl:print("Not Found.~n") + ok -> print("ok~n"); + _ -> print("Not Found.~n") end; 'gateway-clients'(_) -> @@ -144,11 +171,11 @@ gateway(_) -> Tab = emqx_gateway_metrics:tabname(Name), case ets:info(Tab) of undefined -> - emqx_ctl:print("Bad Gateway Name.~n"); + print("Bad Gateway Name.~n"); _ -> lists:foreach( fun({K, V}) -> - emqx_ctl:print("~-30s: ~w~n", [K, V]) + print("~-30s: ~w~n", [K, V]) end, lists:sort(ets:tab2list(Tab))) end; @@ -177,10 +204,10 @@ dump(_Table, _, '$end_of_table', Result) -> lists:reverse(Result); dump(Table, Tag, Key, Result) -> - PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)], + PrintValue = [print_record({Tag, Record}) || Record <- ets:lookup(Table, Key)], dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]). -print({client, {_, Infos, Stats}}) -> +print_record({client, {_, Infos, Stats}}) -> ClientInfo = maps:get(clientinfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}), _Session = maps:get(session, Infos, #{}), @@ -202,11 +229,14 @@ print({client, {_, Infos, Stats}}) -> connected_at => ConnectedAt }, - emqx_ctl:print("Client(~s, username=~s, peername=~s, " - "clean_start=~s, keepalive=~w, " - "subscriptions=~w, delivered_msgs=~w, " - "connected=~s, created_at=~w, connected_at=~w)~n", - [format(K, maps:get(K, Info)) || K <- InfoKeys]). + print("Client(~s, username=~s, peername=~s, " + "clean_start=~s, keepalive=~w, " + "subscriptions=~w, delivered_msgs=~w, " + "connected=~s, created_at=~w, connected_at=~w)~n", + [format(K, maps:get(K, Info)) || K <- InfoKeys]). + +print(S) -> emqx_ctl:print(S). +print(S, A) -> emqx_ctl:print(S, A). format(_, undefined) -> undefined; diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 05cba46b0..2b804a704 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -25,7 +25,7 @@ %% APIs -export([ load_gateway/2 , update_gateway/2 - , remove_gateway/1 + , unload_gateway/1 , add_listener/3 , update_listener/3 , remove_listener/2 @@ -64,14 +64,43 @@ unload() -> -spec load_gateway(atom_or_bin(), map()) -> ok_or_err(). load_gateway(GwName, Conf) -> - update({?FUNCTION_NAME, bin(GwName), Conf}). + NConf = case maps:take(<<"listeners">>, Conf) of + error -> Conf; + {Ls, Conf1} -> + Conf1#{<<"listeners">> => mapping(Ls)} + end, + update({?FUNCTION_NAME, bin(GwName), NConf}). + +mapping(Ls) when is_list(Ls) -> + convert_to_map(Ls); +mapping(Ls) when is_map(Ls) -> + Ls. + +convert_to_map(Listeners) when is_list(Listeners) -> + lists:foldl(fun(Lis, Acc) -> + {[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis), + emqx_map_lib:deep_merge(Acc, #{Type => #{Name => Lis1}}) + end, #{}, Listeners). + +maps_key_take(Ks, M) -> + maps_key_take(Ks, M, []). +maps_key_take([], M, Acc) -> + {lists:reverse(Acc), M}; +maps_key_take([K|Ks], M, Acc) -> + case maps:take(K, M) of + error -> throw(bad_key); + {V, M1} -> + maps_key_take(Ks, M1, [V|Acc]) + end. -spec update_gateway(atom_or_bin(), map()) -> ok_or_err(). -update_gateway(GwName, Conf) -> +update_gateway(GwName, Conf0) -> + Conf = maps:without([listeners, authentication, + <<"listeners">>, <<"authentication">>], Conf0), update({?FUNCTION_NAME, bin(GwName), Conf}). --spec remove_gateway(atom_or_bin()) -> ok_or_err(). -remove_gateway(GwName) -> +-spec unload_gateway(atom_or_bin()) -> ok_or_err(). +unload_gateway(GwName) -> update({?FUNCTION_NAME, bin(GwName)}). -spec add_listener(atom_or_bin(), listener_ref(), map()) -> ok_or_err(). @@ -148,7 +177,7 @@ pre_config_update({update_gateway, GwName, Conf}, RawConf) -> <<"authentication">>], Conf), {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})} end; -pre_config_update({remove_gateway, GwName}, RawConf) -> +pre_config_update({unload_gateway, GwName}, RawConf) -> {ok, maps:remove(GwName, RawConf)}; pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 5f78a84b5..172193e28 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -298,17 +298,15 @@ do_deinit_authn(Names) -> end end, Names). -do_update_one_by_one(NCfg0, State = #state{ - config = OCfg, - status = Status}) -> - - NCfg = emqx_map_lib:deep_merge(OCfg, NCfg0), - +do_update_one_by_one(NCfg, State = #state{ + name = GwName, + config = OCfg, + status = Status}) -> OEnable = maps:get(enable, OCfg, true), - NEnable = maps:get(enable, NCfg0, OEnable), + NEnable = maps:get(enable, NCfg, OEnable), - OAuth = maps:get(authentication, OCfg, undefined), - NAuth = maps:get(authentication, NCfg0, OAuth), + OAuths = authns(GwName, OCfg), + NAuths = authns(GwName, NCfg), if Status == stopped, NEnable == true -> @@ -317,7 +315,7 @@ do_update_one_by_one(NCfg0, State = #state{ Status == stopped, NEnable == false -> {ok, State#state{config = NCfg}}; Status == running, NEnable == true -> - NState = case NAuth == OAuth of + NState = case NAuths == OAuths of true -> State; false -> %% Reset Authentication first @@ -325,6 +323,7 @@ do_update_one_by_one(NCfg0, State = #state{ AuthnNames = init_authn(State#state.name, NCfg), State#state{authns = AuthnNames} end, + %% XXX: minimum impact update ??? cb_gateway_update(NCfg, NState); Status == running, NEnable == false -> case cb_gateway_unload(State) of