diff --git a/apps/emqx_modules/src/emqx_modules.erl b/apps/emqx_modules/src/emqx_modules.erl new file mode 100644 index 000000000..7a3c6d117 --- /dev/null +++ b/apps/emqx_modules/src/emqx_modules.erl @@ -0,0 +1,37 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_modules). + +-export([ + get_advanced_mqtt_features_in_use/0, + set_advanced_mqtt_features_in_use/1 +]). + +-type advanced_mqtt_feature() :: delayed | topic_rewrite | retained | auto_subscribe. +-type advanced_mqtt_features_in_use() :: #{advanced_mqtt_feature() => boolean()}. + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec get_advanced_mqtt_features_in_use() -> advanced_mqtt_features_in_use(). +get_advanced_mqtt_features_in_use() -> + application:get_env(?MODULE, advanced_mqtt_features_in_use, #{}). + +-spec set_advanced_mqtt_features_in_use(advanced_mqtt_features_in_use()) -> ok. +set_advanced_mqtt_features_in_use(Features) -> + application:set_env(?MODULE, advanced_mqtt_features_in_use, Features). diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 51b314861..2d542a1a1 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -33,7 +33,21 @@ stop(_State) -> ok. maybe_enable_modules() -> - emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(), + DelayedEnabled = emqx_conf:get([delayed, enable], true), + RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0, + RetainerEnabled = emqx_conf:get([retainer, enable], false), + AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0, + application:set_env( + emqx_modules, + advanced_mqtt_features_in_use, + #{ + delayed => DelayedEnabled, + topic_rewrite => RewriteEnabled, + retained => RetainerEnabled, + auto_subscribe => AutoSubscribeEnabled + } + ), + DelayedEnabled andalso emqx_delayed:enable(), emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), emqx_conf_cli:load(), diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 5a935cbd2..0fedf45a3 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -76,10 +76,13 @@ -record(state, { uuid :: undefined | binary(), url :: string(), - report_interval :: undefined | non_neg_integer(), - timer = undefined :: undefined | reference() + report_interval :: non_neg_integer(), + timer = undefined :: undefined | reference(), + previous_metrics = #{} :: map() }). +-type state() :: #state{}. + %% The count of 100-nanosecond intervals between the UUID epoch %% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00. -define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000). @@ -136,6 +139,7 @@ get_telemetry() -> %% is very small, it should be safe to ignore. -dialyzer([{nowarn_function, [init/1]}]). init(_Opts) -> + State0 = empty_state(), UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of [] -> @@ -148,19 +152,15 @@ init(_Opts) -> [#telemetry{uuid = UUID} | _] -> UUID end, - {ok, #state{ - url = ?TELEMETRY_URL, - report_interval = timer:seconds(?REPORT_INTERVAL), - uuid = UUID1 - }}. + {ok, State0#state{uuid = UUID1}}. -handle_call(enable, _From, State) -> +handle_call(enable, _From, State0) -> case ?MODULE:official_version(emqx_app:get_release()) of true -> - report_telemetry(State), + State = report_telemetry(State0), {reply, ok, ensure_report_timer(State)}; false -> - {reply, {error, not_official_version}, State} + {reply, {error, not_official_version}, State0} end; handle_call(disable, _From, State = #state{timer = Timer}) -> case ?MODULE:official_version(emqx_app:get_release()) of @@ -173,7 +173,8 @@ handle_call(disable, _From, State = #state{timer = Timer}) -> handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> {reply, {ok, UUID}, State}; handle_call(get_telemetry, _From, State) -> - {reply, {ok, get_telemetry(State)}, State}; + {_State, Telemetry} = get_telemetry(State), + {reply, {ok, Telemetry}, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -186,11 +187,12 @@ handle_continue(Continue, State) -> ?SLOG(error, #{msg => "unexpected_continue", continue => Continue}), {noreply, State}. -handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef}) -> - case get_status() of - true -> report_telemetry(State); - false -> ok - end, +handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) -> + State = + case get_status() of + true -> report_telemetry(State0); + false -> State0 + end, {noreply, ensure_report_timer(State)}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), @@ -307,6 +309,9 @@ messages_sent() -> messages_received() -> emqx_metrics:val('messages.received'). +topic_count() -> + emqx_stats:getstat('topics.count'). + generate_uuid() -> MicroSeconds = erlang:system_time(microsecond), Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET, @@ -323,9 +328,11 @@ generate_uuid() -> ) ). -get_telemetry(#state{uuid = UUID}) -> +-spec get_telemetry(state()) -> {state(), proplists:proplist()}. +get_telemetry(State0 = #state{uuid = UUID}) -> OSInfo = os_info(), - [ + {MQTTRTInsights, State} = mqtt_runtime_insights(State0), + {State, [ {emqx_version, bin(emqx_app:get_release())}, {license, [{edition, <<"community">>}]}, {os_name, bin(get_value(os_name, OSInfo))}, @@ -339,11 +346,13 @@ get_telemetry(#state{uuid = UUID}) -> {messages_received, messages_received()}, {messages_sent, messages_sent()}, {build_info, build_info()}, - {vm_specs, vm_specs()} - ]. + {vm_specs, vm_specs()}, + {mqtt_runtime_insights, MQTTRTInsights}, + {advanced_mqtt_features, advanced_mqtt_features()} + ]}. -report_telemetry(State = #state{url = URL}) -> - Data = get_telemetry(State), +report_telemetry(State0 = #state{url = URL}) -> + {State, Data} = get_telemetry(State0), case emqx_json:safe_encode(Data) of {ok, Bin} -> httpc_request(post, URL, [], Bin), @@ -351,7 +360,8 @@ report_telemetry(State = #state{url = URL}) -> {error, Reason} -> %% debug? why? ?tp(debug, telemetry_data_encode_error, #{data => Data, reason => Reason}) - end. + end, + State. httpc_request(Method, URL, Headers, Body) -> HTTPOptions = [{timeout, 10_000}], @@ -401,9 +411,59 @@ vm_specs() -> {total_memory, proplists:get_value(available_memory, SysMemData)} ]. +-spec mqtt_runtime_insights(state()) -> {map(), state()}. +mqtt_runtime_insights(State0) -> + {MQTTRates, State} = update_mqtt_rates(State0), + MQTTRTInsights = MQTTRates#{num_topics => topic_count()}, + {MQTTRTInsights, State}. + +-spec update_mqtt_rates(state()) -> {map(), state()}. +update_mqtt_rates( + State = #state{ + previous_metrics = PrevMetrics0, + report_interval = ReportInterval + } +) when + is_integer(ReportInterval), ReportInterval > 0 +-> + MetricsToCheck = + [ + {messages_sent_rate, messages_sent, fun messages_sent/0}, + {messages_received_rate, messages_received, fun messages_received/0} + ], + {Metrics, PrevMetrics} = + lists:foldl( + fun({RateKey, CountKey, Fun}, {Rates0, PrevMetrics1}) -> + NewCount = Fun(), + OldCount = maps:get(CountKey, PrevMetrics1, 0), + Rate = (NewCount - OldCount) / ReportInterval, + Rates = Rates0#{RateKey => Rate}, + PrevMetrics2 = PrevMetrics1#{CountKey => NewCount}, + {Rates, PrevMetrics2} + end, + {#{}, PrevMetrics0}, + MetricsToCheck + ), + {Metrics, State#state{previous_metrics = PrevMetrics}}; +update_mqtt_rates(State) -> + {#{}, State}. + +advanced_mqtt_features() -> + AdvancedFeatures = emqx_modules:get_advanced_mqtt_features_in_use(), + maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures). + bin(L) when is_list(L) -> list_to_binary(L); bin(A) when is_atom(A) -> atom_to_binary(A); bin(B) when is_binary(B) -> B. + +bool2int(true) -> 1; +bool2int(false) -> 0. + +empty_state() -> + #state{ + url = ?TELEMETRY_URL, + report_interval = timer:seconds(?REPORT_INTERVAL) + }. diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index cc8367e07..a78798ea8 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -29,11 +29,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> snabbkaffe:fix_ct_logging(), - emqx_common_test_helpers:start_apps([emqx_modules]), + emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_modules]). + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]). init_per_testcase(t_get_telemetry, Config) -> DataDir = ?config(data_dir, Config), @@ -66,10 +66,21 @@ init_per_testcase(t_get_telemetry, Config) -> end ), Config; +init_per_testcase(t_advanced_mqtt_features, Config) -> + OldValues = emqx_modules:get_advanced_mqtt_features_in_use(), + emqx_modules:set_advanced_mqtt_features_in_use(#{ + delayed => false, + topic_rewrite => false, + retained => false, + auto_subscribe => false + }), + [{old_values, OldValues} | Config]; init_per_testcase(_Testcase, Config) -> TestPID = self(), ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), - ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) -> + ok = meck:expect(httpc, request, fun( + Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts + ) -> TestPID ! {request, Method, URL, Headers, Body} end), Config. @@ -77,6 +88,9 @@ init_per_testcase(_Testcase, Config) -> end_per_testcase(t_get_telemetry, _Config) -> meck:unload([httpc, emqx_telemetry]), ok; +end_per_testcase(t_advanced_mqtt_features, Config) -> + OldValues = ?config(old_values, Config), + emqx_modules:set_advanced_mqtt_features_in_use(OldValues); end_per_testcase(_Testcase, _Config) -> meck:unload([httpc]), ok. @@ -134,6 +148,39 @@ t_get_telemetry(_Config) -> ?assert(0 =< get_value(num_cpus, VMSpecs)), ?assert(is_integer(get_value(total_memory, VMSpecs))), ?assert(0 =< get_value(total_memory, VMSpecs)), + MQTTRTInsights = get_value(mqtt_runtime_insights, TelemetryData), + ?assert(is_number(maps:get(messages_sent_rate, MQTTRTInsights))), + ?assert(is_number(maps:get(messages_received_rate, MQTTRTInsights))), + ?assert(is_integer(maps:get(num_topics, MQTTRTInsights))), + ok. + +t_advanced_mqtt_features(_) -> + {ok, TelemetryData} = emqx_telemetry:get_telemetry(), + AdvFeats = get_value(advanced_mqtt_features, TelemetryData), + ?assertEqual( + #{ + retained => 0, + topic_rewrite => 0, + auto_subscribe => 0, + delayed => 0 + }, + AdvFeats + ), + lists:foreach( + fun(TelemetryKey) -> + EnabledFeats = emqx_modules:get_advanced_mqtt_features_in_use(), + emqx_modules:set_advanced_mqtt_features_in_use(EnabledFeats#{TelemetryKey => true}), + {ok, Data} = emqx_telemetry:get_telemetry(), + #{TelemetryKey := Value} = get_value(advanced_mqtt_features, Data), + ?assertEqual(1, Value, #{key => TelemetryKey}) + end, + [ + retained, + topic_rewrite, + auto_subscribe, + delayed + ] + ), ok. t_enable(_) -> @@ -150,12 +197,70 @@ t_send_after_enable(_) -> ok = snabbkaffe:start_trace(), try ok = emqx_telemetry:enable(), - ?assertMatch({ok, _}, ?block_until(#{?snk_kind := telemetry_data_reported}, 2000, 100)) + ?assertMatch({ok, _}, ?block_until(#{?snk_kind := telemetry_data_reported}, 2000, 100)), + receive + {request, post, _URL, _Headers, Body} -> + {ok, Decoded} = emqx_json:safe_decode(Body, [return_maps]), + ?assertMatch( + #{ + <<"uuid">> := _, + <<"messages_received">> := _, + <<"messages_sent">> := _, + <<"build_info">> := #{}, + <<"vm_specs">> := + #{ + <<"num_cpus">> := _, + <<"total_memory">> := _ + }, + <<"mqtt_runtime_insights">> := + #{ + <<"messages_received_rate">> := _, + <<"messages_sent_rate">> := _, + <<"num_topics">> := _ + }, + <<"advanced_mqtt_features">> := + #{ + <<"retained">> := _, + <<"topic_rewrite">> := _, + <<"auto_subscribe">> := _, + <<"delayed">> := _ + } + }, + Decoded + ) + after 2100 -> + exit(telemetry_not_reported) + end after ok = snabbkaffe:stop(), meck:unload([emqx_telemetry]) end. +t_mqtt_runtime_insights(_) -> + State0 = emqx_telemetry:empty_state(), + {MQTTRTInsights1, State1} = emqx_telemetry:mqtt_runtime_insights(State0), + ?assertEqual( + #{ + messages_sent_rate => 0.0, + messages_received_rate => 0.0, + num_topics => 0 + }, + MQTTRTInsights1 + ), + %% add some fake stats + emqx_metrics:set('messages.sent', 10_000_000_000), + emqx_metrics:set('messages.received', 20_000_000_000), + emqx_stats:setstat('topics.count', 30_000), + {MQTTRTInsights2, _State2} = emqx_telemetry:mqtt_runtime_insights(State1), + assert_approximate(MQTTRTInsights2, messages_sent_rate, "16.53"), + assert_approximate(MQTTRTInsights2, messages_received_rate, "33.07"), + ?assertEqual(30_000, maps:get(num_topics, MQTTRTInsights2)), + ok. + +assert_approximate(Map, Key, Expected) -> + Value = maps:get(Key, Map), + ?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])). + bin(L) when is_list(L) -> list_to_binary(L); bin(B) when is_binary(B) ->