Merge pull request #6637 from zmstone/merge-5.0-beta.3-to-master

Merge 5.0 beta.3 to master
This commit is contained in:
Zaiming (Stone) Shi 2022-01-05 12:08:48 +01:00 committed by GitHub
commit 7ea1cf40ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 77 additions and 74 deletions

View File

@ -275,7 +275,7 @@ ret_gw(GwName, {ok, #{raw_config := GwConf}}) ->
end, maps:to_list(SubConf)), end, maps:to_list(SubConf)),
[NLConfs | Acc] [NLConfs | Acc]
end, [], maps:to_list(LsConf)), end, [], maps:to_list(LsConf)),
{ok, maps:merge(GwConf1, #{<<"listeners">> => NLsConf})}; {ok, maps:merge(GwConf1, #{<<"listeners">> => lists:append(NLsConf)})};
ret_gw(_GwName, Err) -> Err. ret_gw(_GwName, Err) -> Err.
ret_authn(GwName, {ok, #{raw_config := GwConf}}) -> ret_authn(GwName, {ok, #{raw_config := GwConf}}) ->

View File

@ -46,7 +46,6 @@
]). ]).
-export([ stringfy/1 -export([ stringfy/1
, parse_address/1
]). ]).
-export([ normalize_config/1 -export([ normalize_config/1
@ -332,19 +331,6 @@ stringfy(T) when is_list(T); is_binary(T) ->
stringfy(T) -> stringfy(T) ->
iolist_to_binary(io_lib:format("~0p", [T])). iolist_to_binary(io_lib:format("~0p", [T])).
-spec parse_address(binary() | list()) -> {list(), integer()}.
parse_address(S) when is_binary(S); is_list(S) ->
S1 = case is_binary(S) of
true -> lists:reverse(binary_to_list(S));
_ -> lists:reverse(S)
end,
case re:split(S1, ":", [{parts, 2}, {return, list}]) of
[Port0, Host0] ->
{lists:reverse(Host0), list_to_integer(lists:reverse(Port0))};
_ ->
error(badarg)
end.
-spec normalize_config(emqx_config:config()) -spec normalize_config(emqx_config:config())
-> list({ Type :: udp | tcp | ssl | dtls -> list({ Type :: udp | tcp | ssl | dtls
, Name :: atom() , Name :: atom()

View File

@ -148,12 +148,15 @@ stop_grpc_server(GwName) ->
start_grpc_client_channel(_GwName, undefined) -> start_grpc_client_channel(_GwName, undefined) ->
undefined; undefined;
start_grpc_client_channel(GwName, Options = #{address := Address}) -> start_grpc_client_channel(GwName, Options = #{address := Address}) ->
{Host, Port} = try emqx_gateway_utils:parse_address(Address) #{host := Host, port := Port} =
catch error : badarg -> case emqx_http_lib:uri_parse(Address) of
{ok, URIMap0} -> URIMap0;
{error, _Reason} ->
throw({badconf, #{key => address, throw({badconf, #{key => address,
value => Address, value => Address,
reason => illegal_grpc_address reason => illegal_grpc_address
}}) }})
end, end,
case maps:to_list(maps:get(ssl, Options, #{})) of case maps:to_list(maps:get(ssl, Options, #{})) of
[] -> [] ->
@ -170,7 +173,7 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) ->
compose_http_uri(Scheme, Host, Port) -> compose_http_uri(Scheme, Host, Port) ->
lists:flatten( lists:flatten(
io_lib:format( io_lib:format(
"~s://~s:~w", [Scheme, Host, Port])). "~s://~s:~w", [Scheme, inet:ntoa(Host), Port])).
stop_grpc_client_channel(GwName) -> stop_grpc_client_channel(GwName) ->
_ = grpc_client_sup:stop_channel_pool(GwName), _ = grpc_client_sup:stop_channel_pool(GwName),

View File

@ -92,7 +92,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
on_gateway_unload(_Gateway = #{ name := GwName, on_gateway_unload(_Gateway = #{ name := GwName,
config := Config config := Config
}, _GwState = #{registry := RegPid}) -> }, _GwState = #{registry := _RegPid}) ->
exit(RegPid, kill), _ = try emqx_lwm2m_xml_object_db:stop() catch _ : _ -> ok end,
Listeners = emqx_gateway_utils:normalize_config(Config), Listeners = emqx_gateway_utils:normalize_config(Config),
emqx_gateway_utils:stop_listeners(GwName, Listeners). emqx_gateway_utils:stop_listeners(GwName, Listeners).

View File

@ -69,7 +69,7 @@ set_special_cfg(emqx_gateway) ->
emqx_config:put( emqx_config:put(
[gateway, exproto], [gateway, exproto],
#{server => #{bind => 9100}, #{server => #{bind => 9100},
handler => #{address => "127.0.0.1:9001"}, handler => #{address => "http://127.0.0.1:9001"},
listeners => listener_confs(LisType) listeners => listener_confs(LisType)
}); });
set_special_cfg(_App) -> set_special_cfg(_App) ->

View File

@ -172,7 +172,7 @@ t_gateway_exproto(_) ->
%% post %% post
GwConf = #{name => <<"exproto">>, GwConf = #{name => <<"exproto">>,
server => #{bind => <<"9100">>}, server => #{bind => <<"9100">>},
handler => #{address => <<"127.0.0.1:9001">>}, handler => #{address => <<"http://127.0.0.1:9001">>},
listeners => [ listeners => [
#{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>} #{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>}
] ]

View File

@ -139,13 +139,7 @@ api_key(post, #{body := App}) ->
<<"desc">> := Desc0, <<"desc">> := Desc0,
<<"enable">> := Enable <<"enable">> := Enable
} = App, } = App,
%% undefined is never expired ExpiredAt = ensure_expired_at(App),
ExpiredAt0 = maps:get(<<"expired_at">>, App, <<"undefined">>),
ExpiredAt =
case ExpiredAt0 of
<<"undefined">> -> undefined;
_ -> ExpiredAt0
end,
Desc = unicode:characters_to_binary(Desc0, unicode), Desc = unicode:characters_to_binary(Desc0, unicode),
case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of
{ok, NewApp} -> {200, format(NewApp)}; {ok, NewApp} -> {200, format(NewApp)};
@ -164,7 +158,7 @@ api_key_by_name(delete, #{bindings := #{name := Name}}) ->
end; end;
api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) -> api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) ->
Enable = maps:get(<<"enable">>, Body, undefined), Enable = maps:get(<<"enable">>, Body, undefined),
ExpiredAt = maps:get(<<"expired_at">>, Body, undefined), ExpiredAt = ensure_expired_at(Body),
Desc = maps:get(<<"desc">>, Body, undefined), Desc = maps:get(<<"desc">>, Body, undefined),
case emqx_mgmt_auth:update(Name, Enable, ExpiredAt, Desc) of case emqx_mgmt_auth:update(Name, Enable, ExpiredAt, Desc) of
{ok, App} -> {200, format(App)}; {ok, App} -> {200, format(App)};
@ -181,3 +175,6 @@ format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) ->
expired_at => ExpiredAt, expired_at => ExpiredAt,
created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt)) created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt))
}. }.
ensure_expired_at(#{<<"expired_at">> := ExpiredAt})when is_integer(ExpiredAt) -> ExpiredAt;
ensure_expired_at(_) -> undefined.

View File

@ -304,13 +304,16 @@ download_trace_log(get, #{bindings := #{name := Name}}) ->
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
emqx_trace:delete_files_after_send(ZipFileName, Zips), %% emqx_trace:delete_files_after_send(ZipFileName, Zips),
%% TODO use file replace file_binary.(delete file after send is not ready now).
{ok, Binary} = file:read_file(ZipFile),
ZipName = filename:basename(ZipFile),
_ = file:delete(ZipFile),
Headers = #{ Headers = #{
<<"content-type">> => <<"application/x-zip">>, <<"content-type">> => <<"application/x-zip">>,
<<"content-disposition">> => <<"content-disposition">> => iolist_to_binary("attachment; filename=" ++ ZipName)
iolist_to_binary("attachment; filename=" ++ filename:basename(ZipFile))
}, },
{200, Headers, {file, ZipFile}}; {200, Headers, {file_binary, ZipName, Binary}};
{error, not_found} -> ?NOT_FOUND(Name) {error, not_found} -> ?NOT_FOUND(Name)
end. end.

View File

@ -68,11 +68,11 @@ update(Name, Enable, ExpiredAt, Desc) ->
Fun = fun() -> Fun = fun() ->
case mnesia:read(?APP, Name, write) of case mnesia:read(?APP, Name, write) of
[] -> mnesia:abort(not_found); [] -> mnesia:abort(not_found);
[App0 = #?APP{enable = Enable0, expired_at = ExpiredAt0, desc = Desc0}] -> [App0 = #?APP{enable = Enable0, desc = Desc0}] ->
App = App =
App0#?APP{ App0#?APP{
expired_at = ExpiredAt,
enable = ensure_not_undefined(Enable, Enable0), enable = ensure_not_undefined(Enable, Enable0),
expired_at = ensure_not_undefined(ExpiredAt, ExpiredAt0),
desc = ensure_not_undefined(Desc, Desc0) desc = ensure_not_undefined(Desc, Desc0)
}, },
ok = mnesia:write(App), ok = mnesia:write(App),

View File

@ -96,6 +96,13 @@ t_update(_Config) ->
?assertEqual(calendar:rfc3339_to_system_time(binary_to_list(ExpiredAt)), ?assertEqual(calendar:rfc3339_to_system_time(binary_to_list(ExpiredAt)),
calendar:rfc3339_to_system_time(binary_to_list(maps:get(<<"expired_at">>, Update1))) calendar:rfc3339_to_system_time(binary_to_list(maps:get(<<"expired_at">>, Update1)))
), ),
Unexpired1 = maps:without([expired_at], Change),
{ok, Update2} = update_app(Name, Unexpired1),
?assertEqual(<<"undefined">>, maps:get(<<"expired_at">>, Update2)),
Unexpired2 = Change#{expired_at => <<"undefined">>},
{ok, Update3} = update_app(Name, Unexpired2),
?assertEqual(<<"undefined">>, maps:get(<<"expired_at">>, Update3)),
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, update_app(<<"Not-Exist">>, Change)), ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, update_app(<<"Not-Exist">>, Change)),
ok. ok.
@ -137,6 +144,10 @@ t_authorize(_Config) ->
}, },
?assertMatch({ok, #{<<"api_key">> := _, <<"enable">> := true}}, update_app(Name, Expired)), ?assertMatch({ok, #{<<"api_key">> := _, <<"enable">> := true}}, update_app(Name, Expired)),
?assertEqual(Unauthorized, emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader)), ?assertEqual(Unauthorized, emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader)),
UnExpired = #{expired_at => undefined},
?assertMatch({ok, #{<<"api_key">> := _, <<"expired_at">> := <<"undefined">>}},
update_app(Name, UnExpired)),
{ok, _Status1} = emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader),
ok. ok.
t_create_unexpired_app(_Config) -> t_create_unexpired_app(_Config) ->

View File

@ -13,10 +13,9 @@ etc/emqx_retainer.conf:
## Where to store the retained messages. ## Where to store the retained messages.
## Notice that all nodes in a cluster are to have the same storage_type. ## Notice that all nodes in a cluster are to have the same storage_type.
## ##
## Value: ram | disc | disc_only ## Value: ram | disc
## - ram: memory only ## - ram: memory only
## - disc: both memory and disc ## - disc: both memory and disc
## - disc_only: disc only
## ##
## Default: ram ## Default: ram
retainer.storage_type = ram retainer.storage_type = ram
@ -56,4 +55,3 @@ Author
------ ------
EMQ X Team EMQ X Team

View File

@ -74,7 +74,7 @@ emqx_retainer {
type = built_in_database type = built_in_database
## storage_type: ram | disc | disc_only ## storage_type: ram | disc
storage_type = ram storage_type = ram
## Maximum number of retained messages. 0 means no limit. ## Maximum number of retained messages. 0 means no limit.

View File

@ -45,24 +45,22 @@
create_resource(#{storage_type := StorageType}) -> create_resource(#{storage_type := StorageType}) ->
Copies = case StorageType of Copies = case StorageType of
ram -> ram_copies; ram -> ram_copies;
disc -> disc_copies; disc -> disc_copies
disc_only -> disc_only_copies
end,
TableType = case StorageType of
disc_only -> set;
_ -> ordered_set
end, end,
StoreProps = [{ets, [compressed, StoreProps = [{ets, [compressed,
{read_concurrency, true}, {read_concurrency, true},
{write_concurrency, true}]}, {write_concurrency, true}]},
{dets, [{auto_save, 1000}]}], {dets, [{auto_save, 1000}]}],
ok = mria:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, TableType}, {type, ordered_set},
{rlog_shard, ?RETAINER_SHARD}, {rlog_shard, ?RETAINER_SHARD},
{storage, Copies}, {storage, Copies},
{record_name, retained}, {record_name, retained},
{attributes, record_info(fields, retained)}, {attributes, record_info(fields, retained)},
{storage_properties, StoreProps}]), {storage_properties, StoreProps}
]),
ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity), ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
case mnesia:table_info(?TAB, storage_type) of case mnesia:table_info(?TAB, storage_type) of
Copies -> ok; Copies -> ok;

View File

@ -20,7 +20,7 @@ fields("emqx_retainer") ->
fields(mnesia_config) -> fields(mnesia_config) ->
[ {type, ?TYPE(hoconsc:union([built_in_database]))} [ {type, ?TYPE(hoconsc:union([built_in_database]))}
, {storage_type, sc(hoconsc:union([ram, disc, disc_only]), ram)} , {storage_type, sc(hoconsc:union([ram, disc]), ram)}
, {max_retained_messages, sc(integer(), 0, fun is_pos_integer/1)} , {max_retained_messages, sc(integer(), 0, fun is_pos_integer/1)}
]; ];

View File

@ -59,10 +59,12 @@ statsd(put, #{body := Body}) ->
Body, Body,
#{rawconf_with_defaults => true, override_to => cluster}) of #{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewConfig, config := Config}} -> {ok, #{raw_config := NewConfig, config := Config}} ->
_ = emqx_statsd_sup:stop_child(?APP), ok = emqx_statsd_sup:ensure_child_stopped(?APP),
case maps:get(<<"enable">>, Body) of case maps:get(<<"enable">>, Body) of
true -> emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); true ->
false -> ok ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config));
false ->
ok
end, end,
{200, NewConfig}; {200, NewConfig};
{error, Reason} -> {error, Reason} ->

View File

@ -34,7 +34,7 @@ stop(_) ->
maybe_enable_statsd() -> maybe_enable_statsd() ->
case emqx_conf:get([statsd, enable], false) of case emqx_conf:get([statsd, enable], false) of
true -> true ->
emqx_statsd_sup:start_child(?APP, emqx_conf:get([statsd], #{})); emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{}));
false -> false ->
ok ok
end. end.

View File

@ -8,9 +8,9 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([ start_link/0 -export([ start_link/0
, start_child/1 , ensure_child_started/1
, start_child/2 , ensure_child_started/2
, stop_child/1 , ensure_child_stopped/1
]). ]).
-export([init/1]). -export([init/1]).
@ -26,19 +26,24 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_child(supervisor:child_spec()) -> ok. -spec ensure_child_started(supervisor:child_spec()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) -> ensure_child_started(ChildSpec) when is_map(ChildSpec) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)). assert_started(supervisor:start_child(?MODULE, ChildSpec)).
-spec start_child(atom(), map()) -> ok. -spec ensure_child_started(atom(), map()) -> ok.
start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> ensure_child_started(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))).
-spec(stop_child(any()) -> ok | {error, term()}). %% @doc Stop the child worker process.
stop_child(ChildId) -> -spec ensure_child_stopped(any()) -> ok.
ensure_child_stopped(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId); ok ->
Error -> Error %% with terminate_child/2 returned 'ok', it's not possible
%% for supervisor:delete_child/2 to return {error, Reason}
ok = supervisor:delete_child(?MODULE, ChildId);
{error, not_found} ->
ok
end. end.
init([]) -> init([]) ->
@ -50,5 +55,5 @@ init([]) ->
assert_started({ok, _Pid}) -> ok; assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok; assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok; assert_started({error, {already_started, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason). assert_started({error, Reason}) -> erlang:error(Reason).