Merge pull request #10942 from zhongwencool/gateway-conf-update
feat: update gateway from cli
This commit is contained in:
commit
0a017dd549
|
@ -16,7 +16,7 @@
|
||||||
|
|
||||||
-module(emqx_gateway).
|
-module(emqx_gateway).
|
||||||
|
|
||||||
-include("include/emqx_gateway.hrl").
|
-include("emqx_gateway.hrl").
|
||||||
|
|
||||||
%% Gateway APIs
|
%% Gateway APIs
|
||||||
-export([
|
-export([
|
||||||
|
|
|
@ -74,18 +74,20 @@
|
||||||
-type listener_ref() :: {ListenerType :: atom_or_bin(), ListenerName :: atom_or_bin()}.
|
-type listener_ref() :: {ListenerType :: atom_or_bin(), ListenerName :: atom_or_bin()}.
|
||||||
|
|
||||||
-define(IS_SSL(T), (T == <<"ssl_options">> orelse T == <<"dtls_options">>)).
|
-define(IS_SSL(T), (T == <<"ssl_options">> orelse T == <<"dtls_options">>)).
|
||||||
|
-define(IGNORE_KEYS, [<<"listeners">>, ?AUTHN_BIN]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Load/Unload
|
%% Load/Unload
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
-define(GATEWAY, [gateway]).
|
||||||
|
|
||||||
-spec load() -> ok.
|
-spec load() -> ok.
|
||||||
load() ->
|
load() ->
|
||||||
emqx_conf:add_handler([gateway], ?MODULE).
|
emqx_conf:add_handler(?GATEWAY, ?MODULE).
|
||||||
|
|
||||||
-spec unload() -> ok.
|
-spec unload() -> ok.
|
||||||
unload() ->
|
unload() ->
|
||||||
emqx_conf:remove_handler([gateway]).
|
emqx_conf:remove_handler(?GATEWAY).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
|
@ -104,7 +106,7 @@ unconvert_listeners(Ls) when is_list(Ls) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(Lis, Acc) ->
|
fun(Lis, Acc) ->
|
||||||
{[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis),
|
{[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis),
|
||||||
_ = vaildate_listener_name(Name),
|
_ = validate_listener_name(Name),
|
||||||
NLis1 = maps:without([<<"id">>, <<"running">>], Lis1),
|
NLis1 = maps:without([<<"id">>, <<"running">>], Lis1),
|
||||||
emqx_utils_maps:deep_merge(Acc, #{Type => #{Name => NLis1}})
|
emqx_utils_maps:deep_merge(Acc, #{Type => #{Name => NLis1}})
|
||||||
end,
|
end,
|
||||||
|
@ -122,7 +124,7 @@ maps_key_take([K | Ks], M, Acc) ->
|
||||||
{V, M1} -> maps_key_take(Ks, M1, [V | Acc])
|
{V, M1} -> maps_key_take(Ks, M1, [V | Acc])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
vaildate_listener_name(Name) ->
|
validate_listener_name(Name) ->
|
||||||
try
|
try
|
||||||
{match, _} = re:run(Name, "^[0-9a-zA-Z_-]+$"),
|
{match, _} = re:run(Name, "^[0-9a-zA-Z_-]+$"),
|
||||||
ok
|
ok
|
||||||
|
@ -373,7 +375,7 @@ ret_listener_or_err(_, _, Err) ->
|
||||||
emqx_config:raw_config()
|
emqx_config:raw_config()
|
||||||
) ->
|
) ->
|
||||||
{ok, emqx_config:update_request()} | {error, term()}.
|
{ok, emqx_config:update_request()} | {error, term()}.
|
||||||
pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {load_gateway, GwName, Conf}, RawConf) ->
|
||||||
case maps:get(GwName, RawConf, undefined) of
|
case maps:get(GwName, RawConf, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
|
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
|
||||||
|
@ -381,29 +383,25 @@ pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
|
||||||
_ ->
|
_ ->
|
||||||
badres_gateway(already_exist, GwName)
|
badres_gateway(already_exist, GwName)
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {update_gateway, GwName, Conf}, RawConf) ->
|
||||||
case maps:get(GwName, RawConf, undefined) of
|
case maps:get(GwName, RawConf, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
badres_gateway(not_found, GwName);
|
badres_gateway(not_found, GwName);
|
||||||
GwRawConf ->
|
GwRawConf ->
|
||||||
Conf1 = maps:without([<<"listeners">>, ?AUTHN_BIN], Conf),
|
Conf1 = maps:without(?IGNORE_KEYS, Conf),
|
||||||
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf1),
|
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf1),
|
||||||
NConf1 = maps:merge(GwRawConf, NConf),
|
NConf1 = maps:merge(GwRawConf, NConf),
|
||||||
{ok, emqx_utils_maps:deep_put([GwName], RawConf, NConf1)}
|
{ok, emqx_utils_maps:deep_put([GwName], RawConf, NConf1)}
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {unload_gateway, GwName}, RawConf) ->
|
pre_config_update(?GATEWAY, {unload_gateway, GwName}, RawConf) ->
|
||||||
_ = tune_gw_certs(
|
_ = tune_gw_certs(
|
||||||
fun clear_certs/2,
|
fun clear_certs/2,
|
||||||
GwName,
|
GwName,
|
||||||
maps:get(GwName, RawConf, #{})
|
maps:get(GwName, RawConf, #{})
|
||||||
),
|
),
|
||||||
{ok, maps:remove(GwName, RawConf)};
|
{ok, maps:remove(GwName, RawConf)};
|
||||||
pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
case
|
case get_listener(GwName, LType, LName, RawConf) of
|
||||||
emqx_utils_maps:deep_get(
|
|
||||||
[GwName, <<"listeners">>, LType, LName], RawConf, undefined
|
|
||||||
)
|
|
||||||
of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
NConf = convert_certs(certs_dir(GwName), Conf),
|
NConf = convert_certs(certs_dir(GwName), Conf),
|
||||||
NListener = #{LType => #{LName => NConf}},
|
NListener = #{LType => #{LName => NConf}},
|
||||||
|
@ -415,12 +413,8 @@ pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
_ ->
|
_ ->
|
||||||
badres_listener(already_exist, GwName, LType, LName)
|
badres_listener(already_exist, GwName, LType, LName)
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
case
|
case get_listener(GwName, LType, LName, RawConf) of
|
||||||
emqx_utils_maps:deep_get(
|
|
||||||
[GwName, <<"listeners">>, LType, LName], RawConf, undefined
|
|
||||||
)
|
|
||||||
of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
badres_listener(not_found, GwName, LType, LName);
|
badres_listener(not_found, GwName, LType, LName);
|
||||||
OldConf ->
|
OldConf ->
|
||||||
|
@ -432,21 +426,17 @@ pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) -
|
||||||
),
|
),
|
||||||
{ok, NRawConf}
|
{ok, NRawConf}
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {remove_listener, GwName, {LType, LName}}, RawConf) ->
|
pre_config_update(?GATEWAY, {remove_listener, GwName, {LType, LName}}, RawConf) ->
|
||||||
Path = [GwName, <<"listeners">>, LType, LName],
|
case get_listener(GwName, LType, LName, RawConf) of
|
||||||
case emqx_utils_maps:deep_get(Path, RawConf, undefined) of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
{ok, RawConf};
|
{ok, RawConf};
|
||||||
OldConf ->
|
OldConf ->
|
||||||
clear_certs(certs_dir(GwName), OldConf),
|
clear_certs(certs_dir(GwName), OldConf),
|
||||||
|
Path = [GwName, <<"listeners">>, LType, LName],
|
||||||
{ok, emqx_utils_maps:deep_remove(Path, RawConf)}
|
{ok, emqx_utils_maps:deep_remove(Path, RawConf)}
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {add_authn, GwName, Conf}, RawConf) ->
|
||||||
case
|
case get_authn(GwName, RawConf) of
|
||||||
emqx_utils_maps:deep_get(
|
|
||||||
[GwName, ?AUTHN_BIN], RawConf, undefined
|
|
||||||
)
|
|
||||||
of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
CertsDir = authn_certs_dir(GwName, Conf),
|
CertsDir = authn_certs_dir(GwName, Conf),
|
||||||
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf),
|
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf),
|
||||||
|
@ -458,14 +448,8 @@ pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
|
||||||
_ ->
|
_ ->
|
||||||
badres_authn(already_exist, GwName)
|
badres_authn(already_exist, GwName)
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
case
|
case get_listener(GwName, LType, LName, RawConf) of
|
||||||
emqx_utils_maps:deep_get(
|
|
||||||
[GwName, <<"listeners">>, LType, LName],
|
|
||||||
RawConf,
|
|
||||||
undefined
|
|
||||||
)
|
|
||||||
of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
badres_listener(not_found, GwName, LType, LName);
|
badres_listener(not_found, GwName, LType, LName);
|
||||||
Listener ->
|
Listener ->
|
||||||
|
@ -486,12 +470,8 @@ pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
badres_listener_authn(already_exist, GwName, LType, LName)
|
badres_listener_authn(already_exist, GwName, LType, LName)
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {update_authn, GwName, Conf}, RawConf) ->
|
||||||
case
|
case get_authn(GwName, RawConf) of
|
||||||
emqx_utils_maps:deep_get(
|
|
||||||
[GwName, ?AUTHN_BIN], RawConf, undefined
|
|
||||||
)
|
|
||||||
of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
badres_authn(not_found, GwName);
|
badres_authn(not_found, GwName);
|
||||||
OldAuthnConf ->
|
OldAuthnConf ->
|
||||||
|
@ -499,14 +479,8 @@ pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
|
||||||
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf, OldAuthnConf),
|
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf, OldAuthnConf),
|
||||||
{ok, emqx_utils_maps:deep_put([GwName, ?AUTHN_BIN], RawConf, Conf1)}
|
{ok, emqx_utils_maps:deep_put([GwName, ?AUTHN_BIN], RawConf, Conf1)}
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
pre_config_update(?GATEWAY, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
case
|
case get_listener(GwName, LType, LName, RawConf) of
|
||||||
emqx_utils_maps:deep_get(
|
|
||||||
[GwName, <<"listeners">>, LType, LName],
|
|
||||||
RawConf,
|
|
||||||
undefined
|
|
||||||
)
|
|
||||||
of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
badres_listener(not_found, GwName, LType, LName);
|
badres_listener(not_found, GwName, LType, LName);
|
||||||
Listener ->
|
Listener ->
|
||||||
|
@ -533,12 +507,8 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
)}
|
)}
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {remove_authn, GwName}, RawConf) ->
|
pre_config_update(?GATEWAY, {remove_authn, GwName}, RawConf) ->
|
||||||
case
|
case get_authn(GwName, RawConf) of
|
||||||
emqx_utils_maps:deep_get(
|
|
||||||
[GwName, ?AUTHN_BIN], RawConf, undefined
|
|
||||||
)
|
|
||||||
of
|
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
OldAuthnConf ->
|
OldAuthnConf ->
|
||||||
|
@ -549,7 +519,7 @@ pre_config_update(_, {remove_authn, GwName}, RawConf) ->
|
||||||
emqx_utils_maps:deep_remove(
|
emqx_utils_maps:deep_remove(
|
||||||
[GwName, ?AUTHN_BIN], RawConf
|
[GwName, ?AUTHN_BIN], RawConf
|
||||||
)};
|
)};
|
||||||
pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
|
pre_config_update(?GATEWAY, {remove_authn, GwName, {LType, LName}}, RawConf) ->
|
||||||
Path = [GwName, <<"listeners">>, LType, LName, ?AUTHN_BIN],
|
Path = [GwName, <<"listeners">>, LType, LName, ?AUTHN_BIN],
|
||||||
case
|
case
|
||||||
emqx_utils_maps:deep_get(
|
emqx_utils_maps:deep_get(
|
||||||
|
@ -565,10 +535,184 @@ pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
|
||||||
emqx_authentication_config:clear_certs(CertsDir, OldAuthnConf)
|
emqx_authentication_config:clear_certs(CertsDir, OldAuthnConf)
|
||||||
end,
|
end,
|
||||||
{ok, emqx_utils_maps:deep_remove(Path, RawConf)};
|
{ok, emqx_utils_maps:deep_remove(Path, RawConf)};
|
||||||
pre_config_update(_, UnknownReq, _RawConf) ->
|
pre_config_update(?GATEWAY, NewRawConf0 = #{}, OldRawConf = #{}) ->
|
||||||
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
|
%% FIXME don't support gateway's listener's authn update.
|
||||||
|
%% load all authentications
|
||||||
|
NewRawConf1 = pre_load_authentications(NewRawConf0, OldRawConf),
|
||||||
|
%% load all listeners
|
||||||
|
NewRawConf2 = pre_load_listeners(NewRawConf1, OldRawConf),
|
||||||
|
%% load all gateway
|
||||||
|
NewRawConf3 = pre_load_gateways(NewRawConf2, OldRawConf),
|
||||||
|
{ok, NewRawConf3};
|
||||||
|
pre_config_update(Path, UnknownReq, _RawConf) ->
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => "unknown_gateway_update_request",
|
||||||
|
request => UnknownReq,
|
||||||
|
path => Path
|
||||||
|
}),
|
||||||
{error, badreq}.
|
{error, badreq}.
|
||||||
|
|
||||||
|
pre_load_gateways(NewConf, OldConf) ->
|
||||||
|
%% unload old gateways
|
||||||
|
maps:foreach(
|
||||||
|
fun(GwName, _OldGwConf) ->
|
||||||
|
case maps:find(GwName, NewConf) of
|
||||||
|
error -> pre_config_update(?GATEWAY, {unload_gateway, GwName}, OldConf);
|
||||||
|
_ -> ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
OldConf
|
||||||
|
),
|
||||||
|
%% load/update gateways
|
||||||
|
maps:map(
|
||||||
|
fun(GwName, NewGwConf) ->
|
||||||
|
case maps:find(GwName, OldConf) of
|
||||||
|
{ok, NewGwConf} ->
|
||||||
|
NewGwConf;
|
||||||
|
{ok, _OldGwConf} ->
|
||||||
|
{ok, #{GwName := NewGwConf1}} = pre_config_update(
|
||||||
|
?GATEWAY, {update_gateway, GwName, NewGwConf}, OldConf
|
||||||
|
),
|
||||||
|
%% update gateway should pass through ignore keys(listener/authn)
|
||||||
|
PassThroughConf = maps:with(?IGNORE_KEYS, NewGwConf),
|
||||||
|
NewGwConf2 = maps:without(?IGNORE_KEYS, NewGwConf1),
|
||||||
|
maps:merge(NewGwConf2, PassThroughConf);
|
||||||
|
error ->
|
||||||
|
{ok, #{GwName := NewGwConf1}} = pre_config_update(
|
||||||
|
?GATEWAY, {load_gateway, GwName, NewGwConf}, OldConf
|
||||||
|
),
|
||||||
|
NewGwConf1
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
NewConf
|
||||||
|
).
|
||||||
|
|
||||||
|
pre_load_listeners(NewConf, OldConf) ->
|
||||||
|
%% remove listeners
|
||||||
|
maps:foreach(
|
||||||
|
fun(GwName, GwConf) ->
|
||||||
|
Listeners = maps:get(<<"listeners">>, GwConf, #{}),
|
||||||
|
remove_listeners(GwName, NewConf, OldConf, Listeners)
|
||||||
|
end,
|
||||||
|
OldConf
|
||||||
|
),
|
||||||
|
%% add/update listeners
|
||||||
|
maps:map(
|
||||||
|
fun(GwName, GwConf) ->
|
||||||
|
Listeners = maps:get(<<"listeners">>, GwConf, #{}),
|
||||||
|
NewListeners = create_or_update_listeners(GwName, OldConf, Listeners),
|
||||||
|
maps:put(<<"listeners">>, NewListeners, GwConf)
|
||||||
|
end,
|
||||||
|
NewConf
|
||||||
|
).
|
||||||
|
|
||||||
|
create_or_update_listeners(GwName, OldConf, Listeners) ->
|
||||||
|
maps:map(
|
||||||
|
fun(LType, LConf) ->
|
||||||
|
maps:map(
|
||||||
|
fun(LName, LConf1) ->
|
||||||
|
NConf =
|
||||||
|
case get_listener(GwName, LType, LName, OldConf) of
|
||||||
|
undefined ->
|
||||||
|
{ok, NConf0} =
|
||||||
|
pre_config_update(
|
||||||
|
?GATEWAY,
|
||||||
|
{add_listener, GwName, {LType, LName}, LConf1},
|
||||||
|
OldConf
|
||||||
|
),
|
||||||
|
NConf0;
|
||||||
|
_ ->
|
||||||
|
{ok, NConf0} =
|
||||||
|
pre_config_update(
|
||||||
|
?GATEWAY,
|
||||||
|
{update_listener, GwName, {LType, LName}, LConf1},
|
||||||
|
OldConf
|
||||||
|
),
|
||||||
|
NConf0
|
||||||
|
end,
|
||||||
|
get_listener(GwName, LType, LName, NConf)
|
||||||
|
end,
|
||||||
|
LConf
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
Listeners
|
||||||
|
).
|
||||||
|
|
||||||
|
remove_listeners(GwName, NewConf, OldConf, Listeners) ->
|
||||||
|
maps:foreach(
|
||||||
|
fun(LType, LConf) ->
|
||||||
|
maps:foreach(
|
||||||
|
fun(LName, _LConf1) ->
|
||||||
|
case get_listener(GwName, LType, LName, NewConf) of
|
||||||
|
undefined ->
|
||||||
|
pre_config_update(
|
||||||
|
?GATEWAY, {remove_listener, GwName, {LType, LName}}, OldConf
|
||||||
|
);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
LConf
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
Listeners
|
||||||
|
).
|
||||||
|
|
||||||
|
get_listener(GwName, LType, LName, NewConf) ->
|
||||||
|
emqx_utils_maps:deep_get(
|
||||||
|
[GwName, <<"listeners">>, LType, LName], NewConf, undefined
|
||||||
|
).
|
||||||
|
|
||||||
|
get_authn(GwName, Conf) ->
|
||||||
|
emqx_utils_maps:deep_get([GwName, ?AUTHN_BIN], Conf, undefined).
|
||||||
|
|
||||||
|
pre_load_authentications(NewConf, OldConf) ->
|
||||||
|
%% remove authentications when not in new config
|
||||||
|
maps:foreach(
|
||||||
|
fun(GwName, OldGwConf) ->
|
||||||
|
case
|
||||||
|
maps:get(?AUTHN_BIN, OldGwConf, undefined) =/= undefined andalso
|
||||||
|
get_authn(GwName, NewConf) =:= undefined
|
||||||
|
of
|
||||||
|
true ->
|
||||||
|
pre_config_update(?GATEWAY, {remove_authn, GwName}, OldConf);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
OldConf
|
||||||
|
),
|
||||||
|
%% add/update authentications
|
||||||
|
maps:map(
|
||||||
|
fun(GwName, NewGwConf) ->
|
||||||
|
case get_authn(GwName, OldConf) of
|
||||||
|
undefined ->
|
||||||
|
case maps:get(?AUTHN_BIN, NewGwConf, undefined) of
|
||||||
|
undefined ->
|
||||||
|
NewGwConf;
|
||||||
|
AuthN ->
|
||||||
|
{ok, #{GwName := #{?AUTHN_BIN := NAuthN}}} =
|
||||||
|
pre_config_update(?GATEWAY, {add_authn, GwName, AuthN}, OldConf),
|
||||||
|
maps:put(?AUTHN_BIN, NAuthN, NewGwConf)
|
||||||
|
end;
|
||||||
|
OldAuthN ->
|
||||||
|
case maps:get(?AUTHN_BIN, NewGwConf, undefined) of
|
||||||
|
undefined ->
|
||||||
|
NewGwConf;
|
||||||
|
OldAuthN ->
|
||||||
|
NewGwConf;
|
||||||
|
NewAuthN ->
|
||||||
|
{ok, #{GwName := #{?AUTHN_BIN := NAuthN}}} =
|
||||||
|
pre_config_update(
|
||||||
|
?GATEWAY, {update_authn, GwName, NewAuthN}, OldConf
|
||||||
|
),
|
||||||
|
maps:put(?AUTHN_BIN, NAuthN, NewGwConf)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
NewConf
|
||||||
|
).
|
||||||
|
|
||||||
badres_gateway(not_found, GwName) ->
|
badres_gateway(not_found, GwName) ->
|
||||||
{error,
|
{error,
|
||||||
{badres, #{
|
{badres, #{
|
||||||
|
@ -642,7 +786,7 @@ badres_listener_authn(already_exist, GwName, LType, LName) ->
|
||||||
) ->
|
) ->
|
||||||
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
||||||
|
|
||||||
post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
|
post_config_update(?GATEWAY, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
|
||||||
[_Tag, GwName0 | _] = tuple_to_list(Req),
|
[_Tag, GwName0 | _] = tuple_to_list(Req),
|
||||||
GwName = binary_to_existing_atom(GwName0),
|
GwName = binary_to_existing_atom(GwName0),
|
||||||
|
|
||||||
|
@ -657,11 +801,35 @@ post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
|
||||||
{New, Old} when is_map(New), is_map(Old) ->
|
{New, Old} when is_map(New), is_map(Old) ->
|
||||||
emqx_gateway:update(GwName, New)
|
emqx_gateway:update(GwName, New)
|
||||||
end;
|
end;
|
||||||
post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
|
post_config_update(?GATEWAY, _Req = #{}, NewConfig, OldConfig, _AppEnvs) ->
|
||||||
|
%% unload gateways
|
||||||
|
maps:foreach(
|
||||||
|
fun(GwName, _OldGwConf) ->
|
||||||
|
case maps:get(GwName, NewConfig, undefined) of
|
||||||
|
undefined ->
|
||||||
|
emqx_gateway:unload(GwName);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
OldConfig
|
||||||
|
),
|
||||||
|
%% load/update gateways
|
||||||
|
maps:foreach(
|
||||||
|
fun(GwName, NewGwConf) ->
|
||||||
|
case maps:get(GwName, OldConfig, undefined) of
|
||||||
|
undefined ->
|
||||||
|
emqx_gateway:load(GwName, NewGwConf);
|
||||||
|
_ ->
|
||||||
|
emqx_gateway:update(GwName, NewGwConf)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
NewConfig
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal funcs
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
tune_gw_certs(Fun, GwName, Conf) ->
|
tune_gw_certs(Fun, GwName, Conf) ->
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
|
-include("emqx_gateway.hrl").
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
|
|
|
@ -277,6 +277,48 @@ t_load_unload_gateway(_) ->
|
||||||
{config_not_found, [<<"gateway">>, stomp]},
|
{config_not_found, [<<"gateway">>, stomp]},
|
||||||
emqx:get_raw_config([gateway, stomp])
|
emqx:get_raw_config([gateway, stomp])
|
||||||
),
|
),
|
||||||
|
%% test update([gateway], Conf)
|
||||||
|
Raw0 = emqx:get_raw_config([gateway]),
|
||||||
|
#{<<"listeners">> := StompConfL1} = StompConf1,
|
||||||
|
StompConf11 = StompConf1#{
|
||||||
|
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL1)
|
||||||
|
},
|
||||||
|
#{<<"listeners">> := StompConfL2} = StompConf2,
|
||||||
|
StompConf22 = StompConf2#{
|
||||||
|
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL2)
|
||||||
|
},
|
||||||
|
Raw1 = Raw0#{<<"stomp">> => StompConf11},
|
||||||
|
Raw2 = Raw0#{<<"stomp">> => StompConf22},
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
|
||||||
|
assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
config := #{
|
||||||
|
authentication := #{backend := built_in_database, enable := true},
|
||||||
|
listeners := #{tcp := #{default := #{bind := 61613}}},
|
||||||
|
mountpoint := <<"t/">>,
|
||||||
|
idle_timeout := 10000
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_gateway:lookup('stomp')
|
||||||
|
),
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)),
|
||||||
|
assert_confs(StompConf2, emqx:get_raw_config([gateway, stomp])),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
config :=
|
||||||
|
#{
|
||||||
|
authentication := #{backend := built_in_database, enable := true},
|
||||||
|
listeners := #{tcp := #{default := #{bind := 61613}}},
|
||||||
|
idle_timeout := 20000,
|
||||||
|
mountpoint := <<"t2/">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_gateway:lookup('stomp')
|
||||||
|
),
|
||||||
|
%% reset
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
|
||||||
|
?assertEqual(undefined, emqx_gateway:lookup('stomp')),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_load_remove_authn(_) ->
|
t_load_remove_authn(_) ->
|
||||||
|
@ -310,6 +352,40 @@ t_load_remove_authn(_) ->
|
||||||
{config_not_found, [<<"gateway">>, stomp, authentication]},
|
{config_not_found, [<<"gateway">>, stomp, authentication]},
|
||||||
emqx:get_raw_config([gateway, stomp, authentication])
|
emqx:get_raw_config([gateway, stomp, authentication])
|
||||||
),
|
),
|
||||||
|
%% test update([gateway], Conf)
|
||||||
|
Raw0 = emqx:get_raw_config([gateway]),
|
||||||
|
#{<<"listeners">> := StompConfL} = StompConf,
|
||||||
|
StompConf1 = StompConf#{
|
||||||
|
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL),
|
||||||
|
<<"authentication">> => ?CONF_STOMP_AUTHN_1
|
||||||
|
},
|
||||||
|
Raw1 = maps:put(<<"stomp">>, StompConf1, Raw0),
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
|
||||||
|
assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
stomp :=
|
||||||
|
#{
|
||||||
|
authn := <<"password_based:built_in_database">>,
|
||||||
|
listeners := [#{authn := <<"undefined">>, type := tcp}],
|
||||||
|
num_clients := 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_gateway:get_basic_usage_info()
|
||||||
|
),
|
||||||
|
%% reset(remove authn)
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
stomp :=
|
||||||
|
#{
|
||||||
|
authn := <<"undefined">>,
|
||||||
|
listeners := [#{authn := <<"undefined">>, type := tcp}],
|
||||||
|
num_clients := 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_gateway:get_basic_usage_info()
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_load_remove_listeners(_) ->
|
t_load_remove_listeners(_) ->
|
||||||
|
@ -324,6 +400,7 @@ t_load_remove_listeners(_) ->
|
||||||
{<<"tcp">>, <<"default">>},
|
{<<"tcp">>, <<"default">>},
|
||||||
?CONF_STOMP_LISTENER_1
|
?CONF_STOMP_LISTENER_1
|
||||||
),
|
),
|
||||||
|
|
||||||
assert_confs(
|
assert_confs(
|
||||||
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)),
|
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)),
|
||||||
emqx:get_raw_config([gateway, stomp])
|
emqx:get_raw_config([gateway, stomp])
|
||||||
|
@ -355,6 +432,59 @@ t_load_remove_listeners(_) ->
|
||||||
{config_not_found, [<<"gateway">>, stomp, listeners, tcp, default]},
|
{config_not_found, [<<"gateway">>, stomp, listeners, tcp, default]},
|
||||||
emqx:get_raw_config([gateway, stomp, listeners, tcp, default])
|
emqx:get_raw_config([gateway, stomp, listeners, tcp, default])
|
||||||
),
|
),
|
||||||
|
%% test update([gateway], Conf)
|
||||||
|
Raw0 = emqx:get_raw_config([gateway]),
|
||||||
|
Raw1 = emqx_utils_maps:deep_put(
|
||||||
|
[<<"stomp">>, <<"listeners">>, <<"tcp">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_1
|
||||||
|
),
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
|
||||||
|
assert_confs(
|
||||||
|
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)),
|
||||||
|
emqx:get_raw_config([gateway, stomp])
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
stomp :=
|
||||||
|
#{
|
||||||
|
authn := <<"password_based:built_in_database">>,
|
||||||
|
listeners := [#{authn := <<"undefined">>, type := tcp}],
|
||||||
|
num_clients := 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_gateway:get_basic_usage_info()
|
||||||
|
),
|
||||||
|
Raw2 = emqx_utils_maps:deep_put(
|
||||||
|
[<<"stomp">>, <<"listeners">>, <<"tcp">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_2
|
||||||
|
),
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)),
|
||||||
|
assert_confs(
|
||||||
|
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_2)),
|
||||||
|
emqx:get_raw_config([gateway, stomp])
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
stomp :=
|
||||||
|
#{
|
||||||
|
authn := <<"password_based:built_in_database">>,
|
||||||
|
listeners := [#{authn := <<"undefined">>, type := tcp}],
|
||||||
|
num_clients := 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_gateway:get_basic_usage_info()
|
||||||
|
),
|
||||||
|
%% reset(remove listener)
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
stomp :=
|
||||||
|
#{
|
||||||
|
authn := <<"password_based:built_in_database">>,
|
||||||
|
listeners := [],
|
||||||
|
num_clients := 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_gateway:get_basic_usage_info()
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_load_remove_listener_authn(_) ->
|
t_load_remove_listener_authn(_) ->
|
||||||
|
@ -417,6 +547,7 @@ t_load_gateway_with_certs_content(_) ->
|
||||||
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
|
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
|
||||||
emqx:get_raw_config([gateway, stomp])
|
emqx:get_raw_config([gateway, stomp])
|
||||||
),
|
),
|
||||||
|
assert_ssl_confs_files_exist(SslConf),
|
||||||
ok = emqx_gateway_conf:unload_gateway(<<"stomp">>),
|
ok = emqx_gateway_conf:unload_gateway(<<"stomp">>),
|
||||||
assert_ssl_confs_files_deleted(SslConf),
|
assert_ssl_confs_files_deleted(SslConf),
|
||||||
?assertException(
|
?assertException(
|
||||||
|
@ -424,6 +555,25 @@ t_load_gateway_with_certs_content(_) ->
|
||||||
{config_not_found, [<<"gateway">>, stomp]},
|
{config_not_found, [<<"gateway">>, stomp]},
|
||||||
emqx:get_raw_config([gateway, stomp])
|
emqx:get_raw_config([gateway, stomp])
|
||||||
),
|
),
|
||||||
|
%% test update([gateway], Conf)
|
||||||
|
Raw0 = emqx:get_raw_config([gateway]),
|
||||||
|
#{<<"listeners">> := StompConfL} = StompConf,
|
||||||
|
StompConf1 = StompConf#{
|
||||||
|
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL)
|
||||||
|
},
|
||||||
|
Raw1 = emqx_utils_maps:deep_put([<<"stomp">>], Raw0, StompConf1),
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
|
||||||
|
assert_ssl_confs_files_exist(SslConf),
|
||||||
|
?assertEqual(
|
||||||
|
SslConf,
|
||||||
|
emqx_utils_maps:deep_get(
|
||||||
|
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
|
||||||
|
emqx:get_raw_config([gateway, stomp])
|
||||||
|
)
|
||||||
|
),
|
||||||
|
%% reset
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
|
||||||
|
assert_ssl_confs_files_deleted(SslConf),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% TODO: Comment out this test case for now, because emqx_tls_lib
|
%% TODO: Comment out this test case for now, because emqx_tls_lib
|
||||||
|
@ -475,6 +625,7 @@ t_add_listener_with_certs_content(_) ->
|
||||||
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
|
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
|
||||||
emqx:get_raw_config([gateway, stomp])
|
emqx:get_raw_config([gateway, stomp])
|
||||||
),
|
),
|
||||||
|
assert_ssl_confs_files_exist(SslConf),
|
||||||
ok = emqx_gateway_conf:remove_listener(
|
ok = emqx_gateway_conf:remove_listener(
|
||||||
<<"stomp">>, {<<"ssl">>, <<"default">>}
|
<<"stomp">>, {<<"ssl">>, <<"default">>}
|
||||||
),
|
),
|
||||||
|
@ -492,6 +643,34 @@ t_add_listener_with_certs_content(_) ->
|
||||||
{config_not_found, [<<"gateway">>, stomp, listeners, ssl, default]},
|
{config_not_found, [<<"gateway">>, stomp, listeners, ssl, default]},
|
||||||
emqx:get_raw_config([gateway, stomp, listeners, ssl, default])
|
emqx:get_raw_config([gateway, stomp, listeners, ssl, default])
|
||||||
),
|
),
|
||||||
|
|
||||||
|
%% test update([gateway], Conf)
|
||||||
|
Raw0 = emqx:get_raw_config([gateway]),
|
||||||
|
Raw1 = emqx_utils_maps:deep_put(
|
||||||
|
[<<"stomp">>, <<"listeners">>, <<"ssl">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_SSL
|
||||||
|
),
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
|
||||||
|
SslConf1 = emqx_utils_maps:deep_get(
|
||||||
|
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
|
||||||
|
emqx:get_raw_config([gateway, stomp])
|
||||||
|
),
|
||||||
|
assert_ssl_confs_files_exist(SslConf1),
|
||||||
|
%% update
|
||||||
|
Raw2 = emqx_utils_maps:deep_put(
|
||||||
|
[<<"stomp">>, <<"listeners">>, <<"ssl">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_SSL_2
|
||||||
|
),
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)),
|
||||||
|
SslConf2 =
|
||||||
|
emqx_utils_maps:deep_get(
|
||||||
|
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
|
||||||
|
emqx:get_raw_config([gateway, stomp])
|
||||||
|
),
|
||||||
|
assert_ssl_confs_files_exist(SslConf2),
|
||||||
|
%% reset
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
|
||||||
|
assert_ssl_confs_files_deleted(SslConf),
|
||||||
|
assert_ssl_confs_files_deleted(SslConf1),
|
||||||
|
assert_ssl_confs_files_deleted(SslConf2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) ->
|
assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) ->
|
||||||
|
@ -503,6 +682,15 @@ assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) ->
|
||||||
end,
|
end,
|
||||||
Ks
|
Ks
|
||||||
).
|
).
|
||||||
|
assert_ssl_confs_files_exist(SslConf) when is_map(SslConf) ->
|
||||||
|
Ks = [<<"cacertfile">>, <<"certfile">>, <<"keyfile">>],
|
||||||
|
lists:foreach(
|
||||||
|
fun(K) ->
|
||||||
|
Path = maps:get(K, SslConf),
|
||||||
|
{ok, _} = file:read_file(Path)
|
||||||
|
end,
|
||||||
|
Ks
|
||||||
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Utils
|
%% Utils
|
||||||
|
|
|
@ -83,13 +83,13 @@ do_assert_confs(Key, Expected, Effected) ->
|
||||||
|
|
||||||
maybe_unconvert_listeners(Conf) when is_map(Conf) ->
|
maybe_unconvert_listeners(Conf) when is_map(Conf) ->
|
||||||
case maps:take(<<"listeners">>, Conf) of
|
case maps:take(<<"listeners">>, Conf) of
|
||||||
error ->
|
{Ls, Conf1} when is_list(Ls) ->
|
||||||
Conf;
|
|
||||||
{Ls, Conf1} ->
|
|
||||||
Conf1#{
|
Conf1#{
|
||||||
<<"listeners">> =>
|
<<"listeners">> =>
|
||||||
emqx_gateway_conf:unconvert_listeners(Ls)
|
emqx_gateway_conf:unconvert_listeners(Ls)
|
||||||
}
|
};
|
||||||
|
_ ->
|
||||||
|
Conf
|
||||||
end;
|
end;
|
||||||
maybe_unconvert_listeners(Conf) ->
|
maybe_unconvert_listeners(Conf) ->
|
||||||
Conf.
|
Conf.
|
||||||
|
|
Loading…
Reference in New Issue