diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index cd1f64871..d2f6921e3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -275,7 +275,7 @@ ret_gw(GwName, {ok, #{raw_config := GwConf}}) -> end, maps:to_list(SubConf)), [NLConfs | Acc] end, [], maps:to_list(LsConf)), - {ok, maps:merge(GwConf1, #{<<"listeners">> => NLsConf})}; + {ok, maps:merge(GwConf1, #{<<"listeners">> => lists:append(NLsConf)})}; ret_gw(_GwName, Err) -> Err. ret_authn(GwName, {ok, #{raw_config := GwConf}}) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 95720ff13..cc771ed95 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -46,7 +46,6 @@ ]). -export([ stringfy/1 - , parse_address/1 ]). -export([ normalize_config/1 @@ -332,19 +331,6 @@ stringfy(T) when is_list(T); is_binary(T) -> stringfy(T) -> iolist_to_binary(io_lib:format("~0p", [T])). --spec parse_address(binary() | list()) -> {list(), integer()}. -parse_address(S) when is_binary(S); is_list(S) -> - S1 = case is_binary(S) of - true -> lists:reverse(binary_to_list(S)); - _ -> lists:reverse(S) - end, - case re:split(S1, ":", [{parts, 2}, {return, list}]) of - [Port0, Host0] -> - {lists:reverse(Host0), list_to_integer(lists:reverse(Port0))}; - _ -> - error(badarg) - end. - -spec normalize_config(emqx_config:config()) -> list({ Type :: udp | tcp | ssl | dtls , Name :: atom() diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 48dca4324..75057534f 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -148,13 +148,16 @@ stop_grpc_server(GwName) -> start_grpc_client_channel(_GwName, undefined) -> undefined; start_grpc_client_channel(GwName, Options = #{address := Address}) -> - {Host, Port} = try emqx_gateway_utils:parse_address(Address) - catch error : badarg -> - throw({badconf, #{key => address, - value => Address, - reason => illegal_grpc_address - }}) - end, + #{host := Host, port := Port} = + case emqx_http_lib:uri_parse(Address) of + {ok, URIMap0} -> URIMap0; + {error, _Reason} -> + throw({badconf, #{key => address, + value => Address, + reason => illegal_grpc_address + }}) + + end, case maps:to_list(maps:get(ssl, Options, #{})) of [] -> SvrAddr = compose_http_uri(http, Host, Port), @@ -170,7 +173,7 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) -> compose_http_uri(Scheme, Host, Port) -> lists:flatten( io_lib:format( - "~s://~s:~w", [Scheme, Host, Port])). + "~s://~s:~w", [Scheme, inet:ntoa(Host), Port])). stop_grpc_client_channel(GwName) -> _ = grpc_client_sup:stop_channel_pool(GwName), diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index 47ed722b1..f4d73879d 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -92,7 +92,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> on_gateway_unload(_Gateway = #{ name := GwName, config := Config - }, _GwState = #{registry := RegPid}) -> - exit(RegPid, kill), + }, _GwState = #{registry := _RegPid}) -> + _ = try emqx_lwm2m_xml_object_db:stop() catch _ : _ -> ok end, Listeners = emqx_gateway_utils:normalize_config(Config), emqx_gateway_utils:stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 993ed4975..69e9afbaf 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -69,7 +69,7 @@ set_special_cfg(emqx_gateway) -> emqx_config:put( [gateway, exproto], #{server => #{bind => 9100}, - handler => #{address => "127.0.0.1:9001"}, + handler => #{address => "http://127.0.0.1:9001"}, listeners => listener_confs(LisType) }); set_special_cfg(_App) -> diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl index f91347a6e..9b303c100 100644 --- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -172,7 +172,7 @@ t_gateway_exproto(_) -> %% post GwConf = #{name => <<"exproto">>, server => #{bind => <<"9100">>}, - handler => #{address => <<"127.0.0.1:9001">>}, + handler => #{address => <<"http://127.0.0.1:9001">>}, listeners => [ #{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>} ] diff --git a/apps/emqx_management/src/emqx_mgmt_api_app.erl b/apps/emqx_management/src/emqx_mgmt_api_app.erl index 489d679be..4fa9ce469 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_app.erl @@ -139,13 +139,7 @@ api_key(post, #{body := App}) -> <<"desc">> := Desc0, <<"enable">> := Enable } = App, - %% undefined is never expired - ExpiredAt0 = maps:get(<<"expired_at">>, App, <<"undefined">>), - ExpiredAt = - case ExpiredAt0 of - <<"undefined">> -> undefined; - _ -> ExpiredAt0 - end, + ExpiredAt = ensure_expired_at(App), Desc = unicode:characters_to_binary(Desc0, unicode), case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of {ok, NewApp} -> {200, format(NewApp)}; @@ -164,7 +158,7 @@ api_key_by_name(delete, #{bindings := #{name := Name}}) -> end; api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) -> Enable = maps:get(<<"enable">>, Body, undefined), - ExpiredAt = maps:get(<<"expired_at">>, Body, undefined), + ExpiredAt = ensure_expired_at(Body), Desc = maps:get(<<"desc">>, Body, undefined), case emqx_mgmt_auth:update(Name, Enable, ExpiredAt, Desc) of {ok, App} -> {200, format(App)}; @@ -181,3 +175,6 @@ format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) -> expired_at => ExpiredAt, created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt)) }. + +ensure_expired_at(#{<<"expired_at">> := ExpiredAt})when is_integer(ExpiredAt) -> ExpiredAt; +ensure_expired_at(_) -> undefined. diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 52c0f0309..e03ae21eb 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -304,13 +304,16 @@ download_trace_log(get, #{bindings := #{name := Name}}) -> Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), - emqx_trace:delete_files_after_send(ZipFileName, Zips), + %% emqx_trace:delete_files_after_send(ZipFileName, Zips), + %% TODO use file replace file_binary.(delete file after send is not ready now). + {ok, Binary} = file:read_file(ZipFile), + ZipName = filename:basename(ZipFile), + _ = file:delete(ZipFile), Headers = #{ <<"content-type">> => <<"application/x-zip">>, - <<"content-disposition">> => - iolist_to_binary("attachment; filename=" ++ filename:basename(ZipFile)) + <<"content-disposition">> => iolist_to_binary("attachment; filename=" ++ ZipName) }, - {200, Headers, {file, ZipFile}}; + {200, Headers, {file_binary, ZipName, Binary}}; {error, not_found} -> ?NOT_FOUND(Name) end. diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index 512ec6b0f..55201dc86 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -68,11 +68,11 @@ update(Name, Enable, ExpiredAt, Desc) -> Fun = fun() -> case mnesia:read(?APP, Name, write) of [] -> mnesia:abort(not_found); - [App0 = #?APP{enable = Enable0, expired_at = ExpiredAt0, desc = Desc0}] -> + [App0 = #?APP{enable = Enable0, desc = Desc0}] -> App = App0#?APP{ + expired_at = ExpiredAt, enable = ensure_not_undefined(Enable, Enable0), - expired_at = ensure_not_undefined(ExpiredAt, ExpiredAt0), desc = ensure_not_undefined(Desc, Desc0) }, ok = mnesia:write(App), diff --git a/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl index 73d4ad566..031e1622f 100644 --- a/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl @@ -96,6 +96,13 @@ t_update(_Config) -> ?assertEqual(calendar:rfc3339_to_system_time(binary_to_list(ExpiredAt)), calendar:rfc3339_to_system_time(binary_to_list(maps:get(<<"expired_at">>, Update1))) ), + Unexpired1 = maps:without([expired_at], Change), + {ok, Update2} = update_app(Name, Unexpired1), + ?assertEqual(<<"undefined">>, maps:get(<<"expired_at">>, Update2)), + Unexpired2 = Change#{expired_at => <<"undefined">>}, + {ok, Update3} = update_app(Name, Unexpired2), + ?assertEqual(<<"undefined">>, maps:get(<<"expired_at">>, Update3)), + ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, update_app(<<"Not-Exist">>, Change)), ok. @@ -137,6 +144,10 @@ t_authorize(_Config) -> }, ?assertMatch({ok, #{<<"api_key">> := _, <<"enable">> := true}}, update_app(Name, Expired)), ?assertEqual(Unauthorized, emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader)), + UnExpired = #{expired_at => undefined}, + ?assertMatch({ok, #{<<"api_key">> := _, <<"expired_at">> := <<"undefined">>}}, + update_app(Name, UnExpired)), + {ok, _Status1} = emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader), ok. t_create_unexpired_app(_Config) -> diff --git a/apps/emqx_retainer/README.md b/apps/emqx_retainer/README.md index 84d777bbc..b3065fa9c 100644 --- a/apps/emqx_retainer/README.md +++ b/apps/emqx_retainer/README.md @@ -13,10 +13,9 @@ etc/emqx_retainer.conf: ## Where to store the retained messages. ## Notice that all nodes in a cluster are to have the same storage_type. ## -## Value: ram | disc | disc_only +## Value: ram | disc ## - ram: memory only ## - disc: both memory and disc -## - disc_only: disc only ## ## Default: ram retainer.storage_type = ram @@ -56,4 +55,3 @@ Author ------ EMQ X Team - diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 92dc62f24..5824186d0 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -74,7 +74,7 @@ emqx_retainer { type = built_in_database - ## storage_type: ram | disc | disc_only + ## storage_type: ram | disc storage_type = ram ## Maximum number of retained messages. 0 means no limit. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index eb08bf6cc..afea2a4b0 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -45,24 +45,22 @@ create_resource(#{storage_type := StorageType}) -> Copies = case StorageType of ram -> ram_copies; - disc -> disc_copies; - disc_only -> disc_only_copies + disc -> disc_copies end, - TableType = case StorageType of - disc_only -> set; - _ -> ordered_set - end, + StoreProps = [{ets, [compressed, {read_concurrency, true}, {write_concurrency, true}]}, {dets, [{auto_save, 1000}]}], + ok = mria:create_table(?TAB, [ - {type, TableType}, - {rlog_shard, ?RETAINER_SHARD}, - {storage, Copies}, - {record_name, retained}, - {attributes, record_info(fields, retained)}, - {storage_properties, StoreProps}]), + {type, ordered_set}, + {rlog_shard, ?RETAINER_SHARD}, + {storage, Copies}, + {record_name, retained}, + {attributes, record_info(fields, retained)}, + {storage_properties, StoreProps} + ]), ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity), case mnesia:table_info(?TAB, storage_type) of Copies -> ok; diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index e1fa8373a..d6bc598a7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -20,7 +20,7 @@ fields("emqx_retainer") -> fields(mnesia_config) -> [ {type, ?TYPE(hoconsc:union([built_in_database]))} - , {storage_type, sc(hoconsc:union([ram, disc, disc_only]), ram)} + , {storage_type, sc(hoconsc:union([ram, disc]), ram)} , {max_retained_messages, sc(integer(), 0, fun is_pos_integer/1)} ]; diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index d545003b0..ab9416266 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -59,10 +59,12 @@ statsd(put, #{body := Body}) -> Body, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfig, config := Config}} -> - _ = emqx_statsd_sup:stop_child(?APP), + ok = emqx_statsd_sup:ensure_child_stopped(?APP), case maps:get(<<"enable">>, Body) of - true -> emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); - false -> ok + true -> + ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config)); + false -> + ok end, {200, NewConfig}; {error, Reason} -> diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 820a86c04..c0a17fc5b 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -34,7 +34,7 @@ stop(_) -> maybe_enable_statsd() -> case emqx_conf:get([statsd, enable], false) of true -> - emqx_statsd_sup:start_child(?APP, emqx_conf:get([statsd], #{})); + emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})); false -> ok end. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 02b50e3ca..983cf97d4 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -8,9 +8,9 @@ -behaviour(supervisor). -export([ start_link/0 - , start_child/1 - , start_child/2 - , stop_child/1 + , ensure_child_started/1 + , ensure_child_started/2 + , ensure_child_stopped/1 ]). -export([init/1]). @@ -26,19 +26,24 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec start_child(supervisor:child_spec()) -> ok. -start_child(ChildSpec) when is_map(ChildSpec) -> +-spec ensure_child_started(supervisor:child_spec()) -> ok. +ensure_child_started(ChildSpec) when is_map(ChildSpec) -> assert_started(supervisor:start_child(?MODULE, ChildSpec)). --spec start_child(atom(), map()) -> ok. -start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> +-spec ensure_child_started(atom(), map()) -> ok. +ensure_child_started(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). --spec(stop_child(any()) -> ok | {error, term()}). -stop_child(ChildId) -> +%% @doc Stop the child worker process. +-spec ensure_child_stopped(any()) -> ok. +ensure_child_stopped(ChildId) -> case supervisor:terminate_child(?MODULE, ChildId) of - ok -> supervisor:delete_child(?MODULE, ChildId); - Error -> Error + ok -> + %% with terminate_child/2 returned 'ok', it's not possible + %% for supervisor:delete_child/2 to return {error, Reason} + ok = supervisor:delete_child(?MODULE, ChildId); + {error, not_found} -> + ok end. init([]) -> @@ -50,5 +55,5 @@ init([]) -> assert_started({ok, _Pid}) -> ok; assert_started({ok, _Pid, _Info}) -> ok; -assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, {already_started, _Pid}}) -> ok; assert_started({error, Reason}) -> erlang:error(Reason).