Merge pull request #6567 from HJianBo/gw-review-r4
This commit is contained in:
commit
126924bc54
|
@ -275,7 +275,7 @@ ret_gw(GwName, {ok, #{raw_config := GwConf}}) ->
|
||||||
end, maps:to_list(SubConf)),
|
end, maps:to_list(SubConf)),
|
||||||
[NLConfs | Acc]
|
[NLConfs | Acc]
|
||||||
end, [], maps:to_list(LsConf)),
|
end, [], maps:to_list(LsConf)),
|
||||||
{ok, maps:merge(GwConf1, #{<<"listeners">> => NLsConf})};
|
{ok, maps:merge(GwConf1, #{<<"listeners">> => lists:append(NLsConf)})};
|
||||||
ret_gw(_GwName, Err) -> Err.
|
ret_gw(_GwName, Err) -> Err.
|
||||||
|
|
||||||
ret_authn(GwName, {ok, #{raw_config := GwConf}}) ->
|
ret_authn(GwName, {ok, #{raw_config := GwConf}}) ->
|
||||||
|
|
|
@ -46,7 +46,6 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ stringfy/1
|
-export([ stringfy/1
|
||||||
, parse_address/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ normalize_config/1
|
-export([ normalize_config/1
|
||||||
|
@ -332,19 +331,6 @@ stringfy(T) when is_list(T); is_binary(T) ->
|
||||||
stringfy(T) ->
|
stringfy(T) ->
|
||||||
iolist_to_binary(io_lib:format("~0p", [T])).
|
iolist_to_binary(io_lib:format("~0p", [T])).
|
||||||
|
|
||||||
-spec parse_address(binary() | list()) -> {list(), integer()}.
|
|
||||||
parse_address(S) when is_binary(S); is_list(S) ->
|
|
||||||
S1 = case is_binary(S) of
|
|
||||||
true -> lists:reverse(binary_to_list(S));
|
|
||||||
_ -> lists:reverse(S)
|
|
||||||
end,
|
|
||||||
case re:split(S1, ":", [{parts, 2}, {return, list}]) of
|
|
||||||
[Port0, Host0] ->
|
|
||||||
{lists:reverse(Host0), list_to_integer(lists:reverse(Port0))};
|
|
||||||
_ ->
|
|
||||||
error(badarg)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec normalize_config(emqx_config:config())
|
-spec normalize_config(emqx_config:config())
|
||||||
-> list({ Type :: udp | tcp | ssl | dtls
|
-> list({ Type :: udp | tcp | ssl | dtls
|
||||||
, Name :: atom()
|
, Name :: atom()
|
||||||
|
|
|
@ -148,13 +148,16 @@ stop_grpc_server(GwName) ->
|
||||||
start_grpc_client_channel(_GwName, undefined) ->
|
start_grpc_client_channel(_GwName, undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
|
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
|
||||||
{Host, Port} = try emqx_gateway_utils:parse_address(Address)
|
#{host := Host, port := Port} =
|
||||||
catch error : badarg ->
|
case emqx_http_lib:uri_parse(Address) of
|
||||||
throw({badconf, #{key => address,
|
{ok, URIMap0} -> URIMap0;
|
||||||
value => Address,
|
{error, _Reason} ->
|
||||||
reason => illegal_grpc_address
|
throw({badconf, #{key => address,
|
||||||
}})
|
value => Address,
|
||||||
end,
|
reason => illegal_grpc_address
|
||||||
|
}})
|
||||||
|
|
||||||
|
end,
|
||||||
case maps:to_list(maps:get(ssl, Options, #{})) of
|
case maps:to_list(maps:get(ssl, Options, #{})) of
|
||||||
[] ->
|
[] ->
|
||||||
SvrAddr = compose_http_uri(http, Host, Port),
|
SvrAddr = compose_http_uri(http, Host, Port),
|
||||||
|
@ -170,7 +173,7 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) ->
|
||||||
compose_http_uri(Scheme, Host, Port) ->
|
compose_http_uri(Scheme, Host, Port) ->
|
||||||
lists:flatten(
|
lists:flatten(
|
||||||
io_lib:format(
|
io_lib:format(
|
||||||
"~s://~s:~w", [Scheme, Host, Port])).
|
"~s://~s:~w", [Scheme, inet:ntoa(Host), Port])).
|
||||||
|
|
||||||
stop_grpc_client_channel(GwName) ->
|
stop_grpc_client_channel(GwName) ->
|
||||||
_ = grpc_client_sup:stop_channel_pool(GwName),
|
_ = grpc_client_sup:stop_channel_pool(GwName),
|
||||||
|
|
|
@ -92,7 +92,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
|
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, _GwState = #{registry := RegPid}) ->
|
}, _GwState = #{registry := _RegPid}) ->
|
||||||
exit(RegPid, kill),
|
_ = try emqx_lwm2m_xml_object_db:stop() catch _ : _ -> ok end,
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||||
emqx_gateway_utils:stop_listeners(GwName, Listeners).
|
emqx_gateway_utils:stop_listeners(GwName, Listeners).
|
||||||
|
|
|
@ -69,7 +69,7 @@ set_special_cfg(emqx_gateway) ->
|
||||||
emqx_config:put(
|
emqx_config:put(
|
||||||
[gateway, exproto],
|
[gateway, exproto],
|
||||||
#{server => #{bind => 9100},
|
#{server => #{bind => 9100},
|
||||||
handler => #{address => "127.0.0.1:9001"},
|
handler => #{address => "http://127.0.0.1:9001"},
|
||||||
listeners => listener_confs(LisType)
|
listeners => listener_confs(LisType)
|
||||||
});
|
});
|
||||||
set_special_cfg(_App) ->
|
set_special_cfg(_App) ->
|
||||||
|
|
|
@ -172,7 +172,7 @@ t_gateway_exproto(_) ->
|
||||||
%% post
|
%% post
|
||||||
GwConf = #{name => <<"exproto">>,
|
GwConf = #{name => <<"exproto">>,
|
||||||
server => #{bind => <<"9100">>},
|
server => #{bind => <<"9100">>},
|
||||||
handler => #{address => <<"127.0.0.1:9001">>},
|
handler => #{address => <<"http://127.0.0.1:9001">>},
|
||||||
listeners => [
|
listeners => [
|
||||||
#{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>}
|
#{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>}
|
||||||
]
|
]
|
||||||
|
|
Loading…
Reference in New Issue