diff --git a/apps/emqx_auth_http/src/emqx_auth_http.appup.src b/apps/emqx_auth_http/src/emqx_auth_http.appup.src index 8ebc195dd..b6d11dd8c 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.appup.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.appup.src @@ -1,26 +1,16 @@ -%% -*-: erlang -*- - +%% -*- mode: erlang -*- {VSN, - [ - {"4.3.2", [ - {apply, {application, stop, [emqx_auth_http]}}, - {load_module, emqx_auth_http_app, brutal_purge, soft_purge,[]}, - {load_module, emqx_auth_http_cli, brutal_purge, soft_purge,[]} - ]}, - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_auth_http} - ]}, - {<<".*">>, []} - ], - [ - {"4.3.2", [ - {apply, {application, stop, [emqx_auth_http]}}, - {load_module, emqx_auth_http_app, brutal_purge, soft_purge,[]}, - {load_module, emqx_auth_http_cli, brutal_purge, soft_purge,[]} - ]}, - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_auth_http} - ]}, - {<<".*">>, []} - ] -}. + [{"4.3.2", + [{apply,{application,stop,[emqx_auth_http]}}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, + {<<"4.3.[0-1]">>, + [{restart_application,emqx_auth_http}]}, + {<<".*">>,[]}], + [{"4.3.2", + [{apply,{application,stop,[emqx_auth_http]}}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, + {<<"4.3.[0-1]">>, + [{restart_application,emqx_auth_http}]}, + {<<".*">>,[]}]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 650c9d7fa..e6d1c55c4 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mqtt, [{description, "EMQ X Bridge to MQTT Broker"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,replayq,emqtt]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index b32d747f4..3b50949cd 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -1,18 +1,24 @@ %% -*-: erlang -*- -{VSN, +{"4.3.3", [ + {<<"4.3.[1-2]">>, [ + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", []}, + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}, + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ], [ - {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} + {<<"4.3.[1-2]">>, [ + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, + {"4.3.0", [ + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}, + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} ]}, - {"4.3.1", []}, {<<".*">>, []} ] }. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 3f685a72a..21cda5b6d 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -410,21 +410,35 @@ start_resource(ResId, PoolName, Options) -> end. test_resource_status(PoolName) -> - IsConnected = fun(Worker) -> - case ecpool_worker:client(Worker) of - {ok, Bridge} -> - try emqx_bridge_worker:status(Bridge) of - connected -> true; - _ -> false - catch _Error:_Reason -> - false - end; - {error, _} -> - false - end - end, - Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - lists:any(fun(St) -> St =:= true end, Status). + Parent = self(), + Pids = [spawn(fun() -> Parent ! {self(), get_worker_status(Worker)} end) + || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + try + Status = [ + receive {Pid, R} -> R + after 1000 -> %% get_worker_status/1 should be a quick operation + throw({timeout, Pid}) + end || Pid <- Pids], + lists:any(fun(St) -> St =:= true end, Status) + catch + throw:Reason -> + ?LOG(error, "Get mqtt bridge status timeout: ~p", [Reason]), + lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids), + false + end. + +get_worker_status(Worker) -> + case ecpool_worker:client(Worker) of + {ok, Bridge} -> + try emqx_bridge_worker:status(Bridge) of + connected -> true; + _ -> false + catch _Error:_Reason -> + false + end; + {error, _} -> + false + end. -spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()). on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> @@ -433,13 +447,13 @@ on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> on_resource_destroy(ResId, #{<<"pool">> := PoolName}) -> ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), - case ecpool:stop_sup_pool(PoolName) of - ok -> - ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); - {error, Reason} -> - ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), - error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) - end. + case ecpool:stop_sup_pool(PoolName) of + ok -> + ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); + {error, Reason} -> + ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), + error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) + end. on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, <<"forward_topic">> := ForwardTopic, diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index 6656eb149..c2b7e7bc4 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -1,27 +1,16 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- {VSN, - [ - {<<"4\\.3\\.[0-1]">>, [ - {restart_application, emqx_lwm2m} - ]}, - {"4.3.2", [ - {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} - ]}, - {"4.3.3", []}, %% only config change - {"4.3.4", [ - {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} - ]} - ], - [ - {<<"4\\.3\\.[0-1]">>, [ - {restart_application, emqx_lwm2m} - ]}, - {"4.3.2", [ - {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} - ]}, - {"4.3.3", []}, %% only config change - {"4.3.4", [ - {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} - ]} - ] -}. + [{<<"4\\.3\\.[0-1]">>, + [{restart_application,emqx_lwm2m}]}, + {"4.3.2", + [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}]}, + {"4.3.3",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.4",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]}], + [{<<"4\\.3\\.[0-1]">>, + [{restart_application,emqx_lwm2m}]}, + {"4.3.2", + [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}]}, + {"4.3.3",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.4",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]}]}. diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index e50724d6d..f0478add1 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,13 +1,13 @@ %% -*- mode: erlang -*- {VSN, - [ {<<"4\\.3\\.[0-7]+">>, + [ {<<"4\\.3\\.[0-8]+">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} ]}, {<<".*">>, []} ], - [ {<<"4\\.3\\.[0-7]+">>, + [ {<<"4\\.3\\.[0-8]+">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 61c32613b..8f871ecfc 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -124,6 +124,7 @@ -define(APP, emqx_management). +-elvis([{elvis_style, god_modules, disable}]). %%-------------------------------------------------------------------- %% Node Info %%-------------------------------------------------------------------- @@ -143,7 +144,7 @@ node_info(Node) when Node =:= node() -> Info#{node => node(), otp_release => iolist_to_binary(otp_rel()), memory_total => proplists:get_value(allocated, Memory), - memory_used => proplists:get_value(total, Memory), + memory_used => proplists:get_value(used, Memory), process_available => erlang:system_info(process_limit), process_used => erlang:system_info(process_count), max_fds => proplists:get_value(max_fds, @@ -298,7 +299,7 @@ call_client(Node, ClientId, Req) when Node =:= node() -> Pid = lists:last(Pids), case emqx_cm:get_chan_info(ClientId, Pid) of #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(Pid, Req); + erlang:apply(ConnMod, call, [Pid, Req]); undefined -> {error, not_found} end end; @@ -322,9 +323,10 @@ list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]). + list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], - M:F(ets:select(emqx_suboption, MatchSpec)); + erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]); list_subscriptions_via_topic(Node, Topic, FormatFun) -> rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 0a883cee5..8b5c251c6 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -39,3 +39,11 @@ retainer.max_payload_size = 1MB ## ## Defaut: 0 retainer.expiry_interval = 0 + +## When the retained flag of the PUBLISH message is set and Payload is empty, +## whether to continue to publish the message. +## see: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038 +## +## Value: Boolean +## Default: false +#retainer.stop_publish_clear_msg = false diff --git a/apps/emqx_retainer/priv/emqx_retainer.schema b/apps/emqx_retainer/priv/emqx_retainer.schema index e598864e1..5e998aa16 100644 --- a/apps/emqx_retainer/priv/emqx_retainer.schema +++ b/apps/emqx_retainer/priv/emqx_retainer.schema @@ -28,3 +28,10 @@ {default, 0}, {datatype, [integer, {duration, ms}]} ]}. + +%% Stop publish clear message +%% {$configurable} +{mapping, "retainer.stop_publish_clear_msg", "emqx_retainer.stop_publish_clear_msg", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.appup.src b/apps/emqx_retainer/src/emqx_retainer.appup.src index a17e6ee2f..19e8e835f 100644 --- a/apps/emqx_retainer/src/emqx_retainer.appup.src +++ b/apps/emqx_retainer/src/emqx_retainer.appup.src @@ -1,13 +1,13 @@ %% -*-: erlang -*- {VSN, [ - {"4.3.0", [ + {<<"4\\.3\\.[0-1]+">>, [ {load_module, emqx_retainer, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], [ - {"4.3.0", [ + {<<"4\\.3\\.[0-1]+">>, [ {load_module, emqx_retainer, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index b6de9d9f6..b99db5092 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -84,9 +84,14 @@ dispatch(Pid, Topic) -> %% RETAIN flag set to 1 and payload containing zero bytes on_message_publish(Msg = #message{flags = #{retain := true}, topic = Topic, - payload = <<>>}, _Env) -> + payload = <<>>}, Env) -> mnesia:dirty_delete(?TAB, topic2tokens(Topic)), - {ok, Msg}; + case stop_publish_clear_msg(Env) of + true -> + {ok, emqx_message:set_header(allow_publish, false, Msg)}; + _ -> + {ok, Msg} + end; on_message_publish(Msg = #message{flags = #{retain := true}}, Env) -> Msg1 = emqx_message:set_header(retained, true, Msg), @@ -229,6 +234,9 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> "for payload is too big!", [Topic, iolist_size(Payload)]) end. +stop_publish_clear_msg(Env) -> + proplists:get_bool(stop_publish_clear_msg, Env). + is_table_full(Env) -> Limit = proplists:get_value(max_retained_messages, Env, 0), Limit > 0 andalso (retained_count() > Limit). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 1df042dd9..28f667b53 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -42,6 +42,8 @@ init_per_testcase(TestCase, Config) -> case TestCase of t_message_expiry_2 -> application:set_env(emqx_retainer, expiry_interval, 2000); + t_stop_publish_clear_msg -> + application:set_env(emqx_retainer, stop_publish_clear_msg, true); _ -> application:set_env(emqx_retainer, expiry_interval, 0) end, @@ -173,6 +175,19 @@ t_clean(_) -> ok = emqtt:disconnect(C1). +t_stop_publish_clear_msg(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), + + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(1, length(receive_messages(1))), + + emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), + ?assertEqual(0, length(receive_messages(1))), + + ok = emqtt:disconnect(C1). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index c48a76496..a36166eae 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.3"}, % strict semver, bump manually! + {vsn, "4.3.4"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 2bd6f5646..0fbd16b1b 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,18 +1,24 @@ %% -*-: erlang -*- {VSN, [ - {"4.3.2", [ - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} - ]}, - {<<"4.3.[0-1]">>, [ + {"4.3.3", [ + {load_module, emqx_sn_registry, brutal_purge, soft_purge, []} + ]}, + {"4.3.2", [ + {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} + ]}, + {<<"4\\.3\\.[0-1]">>, [ {restart_application, emqx_sn} ]} ], [ - {"4.3.2", [ - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} + {"4.3.3", [ + {load_module, emqx_sn_registry, brutal_purge, soft_purge, []} ]}, - {<<"4.3.[0-1]">>, [ + {"4.3.2", [ + {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} + ]}, + {<<"4\\.3\\.[0-1]">>, [ {restart_application, emqx_sn} ]} ] diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index 4a3b22585..c97a49f5a 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -44,32 +44,16 @@ , code_change/3 ]). +-ifdef(TEST). +-export([create_table/0]). +-endif. + -define(TAB, ?MODULE). -record(state, {max_predef_topic_id = 0}). -record(emqx_sn_registry, {key, value}). -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - - -%% @doc Create or replicate tables. --spec(mnesia(boot | copy) -> ok). -mnesia(boot) -> - %% Optimize storage - StoreProps = [{ets, [{read_concurrency, true}]}], - ok = ekka_mnesia:create_table(?MODULE, [ - {attributes, record_info(fields, emqx_sn_registry)}, - {ram_copies, [node()]}, - {storage_properties, StoreProps}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?MODULE, ram_copies). - %%----------------------------------------------------------------------------- -spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). @@ -123,6 +107,7 @@ unregister_topic(ClientId) -> %%----------------------------------------------------------------------------- init([PredefTopics]) -> + create_table(), %% {predef, TopicId} -> TopicName %% {predef, TopicName} -> TopicId %% {ClientId, TopicId} -> TopicName @@ -137,6 +122,15 @@ init([PredefTopics]) -> end, 0, PredefTopics), {ok, #state{max_predef_topic_id = MaxPredefId}}. +create_table() -> + %% Optimize storage + StoreProps = [{ets, [{read_concurrency, true}]}], + ok = ekka_mnesia:create_table(?MODULE, [ + {attributes, record_info(fields, emqx_sn_registry)}, + {ram_copies, [node()]}, + {storage_properties, StoreProps}]), + ok = ekka_mnesia:copy_table(?MODULE, ram_copies). + handle_call({register, ClientId, TopicName}, _From, State = #state{max_predef_topic_id = PredefId}) -> case lookup_topic_id(ClientId, TopicName) of diff --git a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl index 8d320d8ed..023d83ae5 100644 --- a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl @@ -42,7 +42,7 @@ end_per_suite(_Config) -> init_per_testcase(_TestCase, Config) -> ekka_mnesia:start(), - emqx_sn_registry:mnesia(boot), + emqx_sn_registry:create_table(), mnesia:clear_table(emqx_sn_registry), PredefTopics = application:get_env(emqx_sn, predefined, []), {ok, _Pid} = ?REGISTRY:start_link(PredefTopics), @@ -118,4 +118,3 @@ register_a_lot(N, Max) when N < Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), register_a_lot(N+1, Max). - diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index a29390bc1..0b5372dc9 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -1,16 +1,16 @@ %% -*- mode: erlang -*- {VSN, [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, - {"4.3.1",[ - {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {"4.3.1", + [{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}, {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]}, {<<".*">>,[]}], [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, - {"4.3.1",[ - {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {"4.3.1", + [{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index 40bdfbdbf..5c801d24a 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -4,25 +4,31 @@ [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, - {"4.3.5",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<"4.3.[0-2]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<"4.3.[3-4]">>, - [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.7", [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, - {"4.3.5",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<"4.3.[0-2]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<"4.3.[3-4]">>, - [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}]}. diff --git a/etc/emqx.conf b/etc/emqx.conf index de5c062a4..d6bfcb137 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -668,9 +668,11 @@ mqtt.max_packet_size = 1MB mqtt.max_clientid_len = 65535 ## Maximum topic levels allowed. 0 means no limit. +## Depth so big may lead to subscribing performance issues. ## -## Value: Number -mqtt.max_topic_levels = 0 +## Value: Number [0-65535] +## Default: 128 +mqtt.max_topic_levels = 128 ## Maximum QoS allowed. ## @@ -774,8 +776,9 @@ zone.external.force_gc_policy = 16000|16MB ## Maximum topic levels allowed. 0 means no limit. ## -## Value: Number -## zone.external.max_topic_levels = 7 +## Value: Number [0-65535] +## Default: 7 +zone.external.max_topic_levels = 7 ## Maximum QoS allowed. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index facd86455..0aa77865c 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -830,7 +830,7 @@ end}. %% @doc Set the Maximum topic levels. {mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [ - {default, 0}, + {default, 128}, {datatype, integer} ]}. diff --git a/rebar.config b/rebar.config index cd2cf3a64..da51eec95 100644 --- a/rebar.config +++ b/rebar.config @@ -47,7 +47,7 @@ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} - , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}} diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 39a71b492..7674a941c 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -99,8 +99,26 @@ main(Options, Baseline) -> [] -> ok; _ -> - set_invalid(), - log("ERROR: The appup files are incomplete. Missing changes:~n ~p", [AppupChanges]) + Diffs = + lists:filtermap( + fun({App, {Upgrade, Downgrade, OldUpgrade, OldDowngrade}}) -> + case parse_appup_diffs(Upgrade, OldUpgrade, + Downgrade, OldDowngrade) of + ok -> + false; + {diffs, Diffs} -> + {true, {App, Diffs}} + end + end, + AppupChanges), + case Diffs =:= [] of + true -> + ok; + false -> + set_invalid(), + log("ERROR: The appup files are incomplete. Missing changes:~n ~p", + [Diffs]) + end end; false -> update_appups(AppupChanges) @@ -154,7 +172,7 @@ download_prev_release(Tag, #{binary_rel_url := {ok, URL0}, clone_url := Repo}) - Dir = filename:basename(Repo, ".git") ++ [$-|Tag], Filename = filename:join(BaseDir, Dir), Script = "mkdir -p ${OUTFILE} && - wget -O ${OUTFILE}.zip ${URL} && + wget -c -O ${OUTFILE}.zip ${URL} && unzip -n -d ${OUTFILE} ${OUTFILE}.zip", Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}], bash(Script, Env), @@ -189,9 +207,52 @@ find_appup_actions(App, CurrAppIdx, PrevAppIdx = #app{version = PrevVersion}) -> %% The appup file has been already updated: []; true -> - [{App, {Upgrade, Downgrade}}] + [{App, {Upgrade, Downgrade, OldUpgrade, OldDowngrade}}] end. +%% For external dependencies, show only the changes that are missing +%% in their current appup. +diff_appup_instructions(ComputedChanges, PresentChanges) -> + lists:foldr( + fun({Vsn, ComputedActions}, Acc) -> + case find_matching_version(Vsn, PresentChanges) of + undefined -> + [{Vsn, ComputedActions} | Acc]; + PresentActions -> + DiffActions = ComputedActions -- PresentActions, + case DiffActions of + [] -> + %% no diff + Acc; + _ -> + [{Vsn, DiffActions} | Acc] + end + end + end, + [], + ComputedChanges). + +%% For external dependencies, checks if any missing diffs are present +%% and groups them by `up' and `down' types. +parse_appup_diffs(Upgrade, OldUpgrade, Downgrade, OldDowngrade) -> + DiffUp = diff_appup_instructions(Upgrade, OldUpgrade), + DiffDown = diff_appup_instructions(Downgrade, OldDowngrade), + case {DiffUp, DiffDown} of + {[], []} -> + %% no diff for external dependency; ignore + ok; + _ -> + set_invalid(), + Diffs = #{ up => DiffUp + , down => DiffDown + }, + {diffs, Diffs} + end. + +%% TODO: handle regexes +find_matching_version(Vsn, PresentChanges) -> + proplists:get_value(Vsn, PresentChanges). + find_old_appup_actions(App, PrevVersion) -> {Upgrade0, Downgrade0} = case locate(ebin_current, App, ".appup") of @@ -270,12 +331,12 @@ check_appup_files() -> update_appups(Changes) -> lists:foreach( - fun({App, {Upgrade, Downgrade}}) -> - do_update_appup(App, Upgrade, Downgrade) + fun({App, {Upgrade, Downgrade, OldUpgrade, OldDowngrade}}) -> + do_update_appup(App, Upgrade, Downgrade, OldUpgrade, OldDowngrade) end, Changes). -do_update_appup(App, Upgrade, Downgrade) -> +do_update_appup(App, Upgrade, Downgrade, OldUpgrade, OldDowngrade) -> case locate(src, App, ".appup.src") of {ok, AppupFile} -> render_appfile(AppupFile, Upgrade, Downgrade); @@ -284,8 +345,16 @@ do_update_appup(App, Upgrade, Downgrade) -> {ok, AppupFile} -> render_appfile(AppupFile, Upgrade, Downgrade); false -> - set_invalid(), - log("ERROR: Appup file for the external dependency '~p' is not complete.~n Missing changes: ~p~n", [App, Upgrade]) + case parse_appup_diffs(Upgrade, OldUpgrade, + Downgrade, OldDowngrade) of + ok -> + %% no diff for external dependency; ignore + ok; + {diffs, Diffs} -> + set_invalid(), + log("ERROR: Appup file for the external dependency '~p' is not complete.~n Missing changes: ~100p~n", [App, Diffs]), + log("NOTE: Some changes above might be already covered by regexes.~n") + end end end. @@ -298,12 +367,14 @@ render_appfile(File, Upgrade, Downgrade) -> ok = file:write_file(File, IOList). create_stub(App) -> - case locate(src, App, ".app.src") of + case locate(src, App, Ext = ".app.src") of {ok, AppSrc} -> - AppupFile = filename:basename(AppSrc) ++ ".appup.src", + DirName = filename:dirname(AppSrc), + AppupFile = filename:basename(AppSrc, Ext) ++ ".appup.src", Default = {<<".*">>, []}, - render_appfile(AppupFile, [Default], [Default]), - AppupFile; + AppupFileFullpath = filename:join(DirName, AppupFile), + render_appfile(AppupFileFullpath, [Default], [Default]), + {ok, AppupFileFullpath}; undefined -> false end. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 653249ccb..c510b3043 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,13 +1,19 @@ %% -*- mode: erlang -*- {VSN, [{"4.3.11", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, {"4.3.10", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -18,7 +24,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -29,7 +37,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -42,7 +52,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -56,7 +68,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -71,7 +85,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -87,7 +103,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -104,7 +122,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, @@ -121,7 +141,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, @@ -142,7 +164,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -168,13 +192,19 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.11", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, {"4.3.10", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -185,7 +215,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -196,7 +228,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -209,7 +243,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -223,7 +259,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -238,7 +276,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -254,7 +294,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -271,7 +313,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -288,7 +332,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -309,7 +355,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 152f975eb..710895bcd 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -182,7 +182,15 @@ procinfo(Pid) -> case {emqx_vm:get_process_info(Pid), emqx_vm:get_process_gc_info(Pid)} of {undefined, _} -> undefined; {_, undefined} -> undefined; - {Info, GcInfo} -> Info ++ GcInfo + {Info, GcInfo} -> get_proc_lib_initial_call(Pid) ++ GcInfo ++ Info + end. + +get_proc_lib_initial_call(Pid) -> + case proc_lib:initial_call(Pid) of + false -> + []; + InitialCall -> + [{proc_lib_initial_call, InitialCall}] end. safe_publish(Event, WarnMsg) -> diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index 537330b01..adcbcabb0 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -58,11 +58,12 @@ sl_alloc, ll_alloc, fix_alloc, + literal_alloc, std_alloc ]). -define(PROCESS_INFO_KEYS, [initial_call, - current_function, + current_stacktrace, registered_name, status, message_queue_len, @@ -317,7 +318,8 @@ get_process_gc_info(Pid) when is_pid(Pid) -> process_info(Pid, ?PROCESS_GC_KEYS). get_process_group_leader_info(LeaderPid) when is_pid(LeaderPid) -> - [{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO_KEYS)]. + [{Key, Value} + || {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO_KEYS)]. get_process_limit() -> erlang:system_info(process_limit). @@ -339,12 +341,12 @@ get_ets_info(Tab) -> mapping(Entries) -> mapping(Entries, []). mapping([], Acc) -> Acc; -mapping([{owner, V}|Entries], Acc) when is_pid(V) -> +mapping([{owner, V} | Entries], Acc) when is_pid(V) -> OwnerInfo = process_info(V), Owner = proplists:get_value(registered_name, OwnerInfo, undefined), - mapping(Entries, [{owner, Owner}|Acc]); -mapping([{Key, Value}|Entries], Acc) -> - mapping(Entries, [{Key, Value}|Acc]). + mapping(Entries, [{owner, Owner} | Acc]); +mapping([{Key, Value} | Entries], Acc) -> + mapping(Entries, [{Key, Value} | Acc]). avg1() -> compat_windows(fun cpu_sup:avg1/0). diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 60e2541aa..92d191b2a 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -85,7 +85,7 @@ t_chan_caps(_) -> #{max_clientid_len := 65535, max_qos_allowed := 2, max_topic_alias := 65535, - max_topic_levels := 0, + max_topic_levels := 128, retain_available := true, shared_subscription := true, subscription_identifiers := true, diff --git a/test/emqx_sys_mon_SUITE.erl b/test/emqx_sys_mon_SUITE.erl index 70f518ad5..63e7c376a 100644 --- a/test/emqx_sys_mon_SUITE.erl +++ b/test/emqx_sys_mon_SUITE.erl @@ -70,23 +70,51 @@ init_per_testcase(t_sys_mon2, Config) -> (_) -> ok end), Config; +init_per_testcase(t_procinfo, Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + ok = meck:new(emqx_vm, [passthrough, no_history]), + Config; init_per_testcase(_, Config) -> emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. +end_per_testcase(t_procinfo, _Config) -> + ok = meck:unload(emqx_vm), + emqx_ct_helpers:stop_apps([]); end_per_testcase(_, _Config) -> emqx_ct_helpers:stop_apps([]). t_procinfo(_) -> - ok = meck:new(emqx_vm, [passthrough, no_history]), ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end), ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> [] end), ?assertEqual([], emqx_sys_mon:procinfo([])), ok = meck:expect(emqx_vm, get_process_info, fun(_) -> ok end), ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> undefined end), - ?assertEqual(undefined, emqx_sys_mon:procinfo([])), - ok = meck:unload(emqx_vm). + ?assertEqual(undefined, emqx_sys_mon:procinfo([])). + +t_procinfo_initial_call_and_stacktrace(_) -> + SomePid = proc_lib:spawn(?MODULE, some_function, [self(), arg2]), + receive + {spawned, SomePid} -> + ok + after 100 -> + error(process_not_spawned) + end, + ProcInfo = emqx_sys_mon:procinfo(SomePid), + ?assertEqual( + {?MODULE, some_function, ['Argument__1','Argument__2']}, + proplists:get_value(proc_lib_initial_call, ProcInfo)), + ?assertMatch( + [{?MODULE, some_function, 2, + [{file, _}, + {line, _}]}, + {proc_lib, init_p_do_apply, 3, + [{file, _}, + {line, _}]}], + proplists:get_value(current_stacktrace, ProcInfo)), + SomePid ! stop. t_sys_mon(_Config) -> lists:foreach( @@ -120,3 +148,10 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) -> concat_str(ValidateInfo, InfoOrPort, Info) -> WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]), lists:flatten(WarnInfo). + +some_function(Parent, _Arg2) -> + Parent ! {spawned, self()}, + receive + stop -> + ok + end.