refactor(gw): put conf operations into emqx_gateway_conf mod

This commit is contained in:
JianBo He 2021-09-29 19:25:53 +08:00
parent 642b588ad0
commit adaf3db719
5 changed files with 105 additions and 99 deletions

View File

@ -93,16 +93,13 @@ gateway_insta(delete, #{bindings := #{name := Name0}}) ->
end); end);
gateway_insta(get, #{bindings := #{name := Name0}}) -> gateway_insta(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(_, _) -> with_gateway(Name0, fun(_, _) ->
GwConf = filled_raw_confs([<<"gateway">>, Name0]), GwConf = emqx_gateway_conf:gateway(Name0),
LisConf = maps:get(<<"listeners">>, GwConf, #{}), {200, GwConf#{<<"name">> => Name0}}
NLisConf = emqx_gateway_http:mapping_listener_m2l(Name0, LisConf),
{200, GwConf#{<<"name">> => Name0, <<"listeners">> => NLisConf}}
end); end);
gateway_insta(put, #{body := GwConf0, gateway_insta(put, #{body := GwConf,
bindings := #{name := Name0} bindings := #{name := Name0}
}) -> }) ->
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
GwConf = maps:without([<<"authentication">>, <<"listeners">>], GwConf0),
case emqx_gateway_conf:update_gateway(GwName, GwConf) of case emqx_gateway_conf:update_gateway(GwName, GwConf) of
ok -> ok ->
{200}; {200};
@ -114,13 +111,6 @@ gateway_insta(put, #{body := GwConf0,
gateway_insta_stats(get, _Req) -> gateway_insta_stats(get, _Req) ->
return_http_error(401, "Implement it later (maybe 5.1)"). return_http_error(401, "Implement it later (maybe 5.1)").
filled_raw_confs(Path) ->
RawConf = emqx_config:fill_defaults(
emqx_config:get_root_raw(Path)
),
Confs = emqx_map_lib:deep_get(Path, RawConf),
emqx_map_lib:jsonable_map(Confs).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Swagger defines %% Swagger defines
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -57,7 +57,7 @@ apis() ->
listeners(get, #{bindings := #{name := Name0}}) -> listeners(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
{200, emqx_gateway_http:listeners(GwName)} {200, emqx_gateway_conf:listeners(GwName)}
end); end);
listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
@ -90,7 +90,7 @@ listeners_insta(delete, #{bindings := #{name := Name0, id := ListenerId0}}) ->
listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) -> listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0), ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) -> with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_http:listener(ListenerId) of case emqx_gateway_conf:listener(ListenerId) of
{ok, Listener} -> {ok, Listener} ->
{200, Listener}; {200, Listener};
{error, not_found} -> {error, not_found} ->

View File

@ -23,13 +23,20 @@
]). ]).
%% APIs %% APIs
-export([ load_gateway/2 -export([ gateway/1
, load_gateway/2
, update_gateway/2 , update_gateway/2
, unload_gateway/1 , unload_gateway/1
]).
-export([ listeners/1
, listener/1
, add_listener/3 , add_listener/3
, update_listener/3 , update_listener/3
, remove_listener/2 , remove_listener/2
, add_authn/2 ]).
-export([ add_authn/2
, add_authn/3 , add_authn/3
, update_authn/2 , update_authn/2
, update_authn/3 , update_authn/3
@ -67,20 +74,16 @@ load_gateway(GwName, Conf) ->
NConf = case maps:take(<<"listeners">>, Conf) of NConf = case maps:take(<<"listeners">>, Conf) of
error -> Conf; error -> Conf;
{Ls, Conf1} -> {Ls, Conf1} ->
Conf1#{<<"listeners">> => mapping(Ls)} Conf1#{<<"listeners">> => unconvert_listeners(Ls)}
end, end,
update({?FUNCTION_NAME, bin(GwName), NConf}). update({?FUNCTION_NAME, bin(GwName), NConf}).
mapping(Ls) when is_list(Ls) -> %% @doc convert listener array to map
convert_to_map(Ls); unconvert_listeners(Ls) when is_list(Ls) ->
mapping(Ls) when is_map(Ls) ->
Ls.
convert_to_map(Listeners) when is_list(Listeners) ->
lists:foldl(fun(Lis, Acc) -> lists:foldl(fun(Lis, Acc) ->
{[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis), {[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis),
emqx_map_lib:deep_merge(Acc, #{Type => #{Name => Lis1}}) emqx_map_lib:deep_merge(Acc, #{Type => #{Name => Lis1}})
end, #{}, Listeners). end, #{}, Ls).
maps_key_take(Ks, M) -> maps_key_take(Ks, M) ->
maps_key_take(Ks, M, []). maps_key_take(Ks, M, []).
@ -103,6 +106,81 @@ update_gateway(GwName, Conf0) ->
unload_gateway(GwName) -> unload_gateway(GwName) ->
update({?FUNCTION_NAME, bin(GwName)}). update({?FUNCTION_NAME, bin(GwName)}).
%% @doc Get the gateway configurations.
%% Missing fields are filled with default values. This function is typically
%% used to show the user what configuration value is currently in effect.
-spec gateway(atom_or_bin()) -> map().
gateway(GwName0) ->
GwName = bin(GwName0),
Path = [<<"gateway">>, GwName],
RawConf = emqx_config:fill_defaults(
emqx_config:get_root_raw(Path)
),
Confs = emqx_map_lib:jsonable_map(
emqx_map_lib:deep_get(Path, RawConf)),
LsConf = maps:get(<<"listeners">>, Confs, #{}),
Confs#{<<"listeners">> => convert_listeners(GwName, LsConf)}.
%% @doc convert listeners map to array
convert_listeners(GwName, Ls) when is_map(Ls) ->
lists:append([do_convert_listener(GwName, Type, maps:to_list(Conf))
|| {Type, Conf} <- maps:to_list(Ls)]).
do_convert_listener(GwName, Type, Conf) ->
[begin
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LName),
Running = emqx_gateway_utils:is_running(ListenerId, LConf),
bind2str(
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running
})
end || {LName, LConf} <- Conf, is_map(LConf)].
bind2str(LConf = #{bind := Bind}) when is_integer(Bind) ->
maps:put(bind, integer_to_binary(Bind), LConf);
bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) ->
maps:put(<<"bind">>, integer_to_binary(Bind), LConf);
bind2str(LConf = #{bind := Bind}) when is_binary(Bind) ->
LConf;
bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) ->
LConf.
-spec listeners(atom_or_bin()) -> map().
listeners(GwName0) ->
GwName = bin(GwName0),
RawConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])),
Listeners = emqx_map_lib:jsonable_map(
emqx_map_lib:deep_get(
[<<"gateway">>, GwName, <<"listeners">>], RawConf)),
convert_listeners(GwName, Listeners).
-spec listener(binary()) -> {ok, map()} | {error, not_found} | {error, any()}.
listener(ListenerId) ->
{GwName, Type, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
RootConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])),
try
Path = [<<"gateway">>, GwName, <<"listeners">>, Type, LName],
LConf = emqx_map_lib:deep_get(Path, RootConf),
Running = emqx_gateway_utils:is_running(
binary_to_existing_atom(ListenerId), LConf),
{ok, emqx_map_lib:jsonable_map(
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running})}
catch
error : {config_not_found, _} ->
{error, not_found};
_Class : Reason ->
{error, Reason}
end.
-spec add_listener(atom_or_bin(), listener_ref(), map()) -> ok_or_err(). -spec add_listener(atom_or_bin(), listener_ref(), map()) -> ok_or_err().
add_listener(GwName, ListenerRef, Conf) -> add_listener(GwName, ListenerRef, Conf) ->
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf}). update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf}).

View File

@ -24,13 +24,10 @@
-export([ gateways/1 -export([ gateways/1
]). ]).
%% Mgmt APIs - listeners %% Mgmt APIs
-export([ listeners/1 -export([ add_listener/2
, listener/1
, add_listener/2
, remove_listener/1 , remove_listener/1
, update_listener/2 , update_listener/2
, mapping_listener_m2l/2
]). ]).
-export([ authn/1 -export([ authn/1
@ -140,75 +137,6 @@ get_listeners_status(GwName, Config) ->
%% Mgmt APIs - listeners %% Mgmt APIs - listeners
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec listeners(atom() | binary()) -> list().
listeners(GwName) when is_atom(GwName) ->
listeners(atom_to_binary(GwName));
listeners(GwName) ->
RawConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])),
Listeners = emqx_map_lib:jsonable_map(
emqx_map_lib:deep_get(
[<<"gateway">>, GwName, <<"listeners">>], RawConf)),
mapping_listener_m2l(GwName, Listeners).
-spec listener(binary()) -> {ok, map()} | {error, not_found} | {error, any()}.
listener(ListenerId) ->
{GwName, Type, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
RootConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])),
try
Path = [<<"gateway">>, GwName, <<"listeners">>, Type, LName],
LConf = emqx_map_lib:deep_get(Path, RootConf),
Running = is_running(binary_to_existing_atom(ListenerId), LConf),
{ok, emqx_map_lib:jsonable_map(
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running})}
catch
error : {config_not_found, _} ->
{error, not_found};
_Class : Reason ->
{error, Reason}
end.
mapping_listener_m2l(GwName, Listeners0) ->
Listeners = maps:to_list(Listeners0),
lists:append([listener(GwName, Type, maps:to_list(Conf))
|| {Type, Conf} <- Listeners]).
listener(GwName, Type, Conf) ->
[begin
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LName),
Running = is_running(ListenerId, LConf),
bind2str(
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running
})
end || {LName, LConf} <- Conf, is_map(LConf)].
is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0),
try esockd:listener({ListenerId, ListenOn}) of
Pid when is_pid(Pid)->
true
catch _:_ ->
false
end.
bind2str(LConf = #{bind := Bind}) when is_integer(Bind) ->
maps:put(bind, integer_to_binary(Bind), LConf);
bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) ->
maps:put(<<"bind">>, integer_to_binary(Bind), LConf);
bind2str(LConf = #{bind := Bind}) when is_binary(Bind) ->
LConf;
bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) ->
LConf.
-spec add_listener(atom() | binary(), map()) -> ok. -spec add_listener(atom() | binary(), map()) -> ok.
add_listener(ListenerId, NewConf0) -> add_listener(ListenerId, NewConf0) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),

View File

@ -33,6 +33,7 @@
, unix_ts_to_rfc3339/2 , unix_ts_to_rfc3339/2
, listener_id/3 , listener_id/3
, parse_listener_id/1 , parse_listener_id/1
, is_running/2
]). ]).
-export([ stringfy/1 -export([ stringfy/1
@ -148,6 +149,15 @@ parse_listener_id(Id) ->
_ : _ -> error({invalid_listener_id, Id}) _ : _ -> error({invalid_listener_id, Id})
end. end.
is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0),
try esockd:listener({ListenerId, ListenOn}) of
Pid when is_pid(Pid)->
true
catch _:_ ->
false
end.
bin(A) when is_atom(A) -> bin(A) when is_atom(A) ->
atom_to_binary(A); atom_to_binary(A);
bin(L) when is_list(L); is_binary(L) -> bin(L) when is_list(L); is_binary(L) ->