feat: add mqtt runtime insights to telemetry data

This commit is contained in:
Thales Macedo Garitezi 2022-03-22 15:28:19 -03:00
parent bbb3cc6abf
commit 35a523230f
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
2 changed files with 104 additions and 23 deletions

View File

@ -76,10 +76,13 @@
-record(state, { -record(state, {
uuid :: undefined | binary(), uuid :: undefined | binary(),
url :: string(), url :: string(),
report_interval :: undefined | non_neg_integer(), report_interval :: non_neg_integer(),
timer = undefined :: undefined | reference() timer = undefined :: undefined | reference(),
previous_metrics = #{} :: map()
}). }).
-type state() :: #state{}.
%% The count of 100-nanosecond intervals between the UUID epoch %% The count of 100-nanosecond intervals between the UUID epoch
%% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00. %% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00.
-define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000). -define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000).
@ -136,6 +139,7 @@ get_telemetry() ->
%% is very small, it should be safe to ignore. %% is very small, it should be safe to ignore.
-dialyzer([{nowarn_function, [init/1]}]). -dialyzer([{nowarn_function, [init/1]}]).
init(_Opts) -> init(_Opts) ->
State0 = empty_state(),
UUID1 = UUID1 =
case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
[] -> [] ->
@ -148,19 +152,15 @@ init(_Opts) ->
[#telemetry{uuid = UUID} | _] -> [#telemetry{uuid = UUID} | _] ->
UUID UUID
end, end,
{ok, #state{ {ok, State0#state{uuid = UUID1}}.
url = ?TELEMETRY_URL,
report_interval = timer:seconds(?REPORT_INTERVAL),
uuid = UUID1
}}.
handle_call(enable, _From, State) -> handle_call(enable, _From, State0) ->
case ?MODULE:official_version(emqx_app:get_release()) of case ?MODULE:official_version(emqx_app:get_release()) of
true -> true ->
report_telemetry(State), State = report_telemetry(State0),
{reply, ok, ensure_report_timer(State)}; {reply, ok, ensure_report_timer(State)};
false -> false ->
{reply, {error, not_official_version}, State} {reply, {error, not_official_version}, State0}
end; end;
handle_call(disable, _From, State = #state{timer = Timer}) -> handle_call(disable, _From, State = #state{timer = Timer}) ->
case ?MODULE:official_version(emqx_app:get_release()) of case ?MODULE:official_version(emqx_app:get_release()) of
@ -173,7 +173,8 @@ handle_call(disable, _From, State = #state{timer = Timer}) ->
handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> handle_call(get_uuid, _From, State = #state{uuid = UUID}) ->
{reply, {ok, UUID}, State}; {reply, {ok, UUID}, State};
handle_call(get_telemetry, _From, State) -> handle_call(get_telemetry, _From, State) ->
{reply, {ok, get_telemetry(State)}, State}; {_State, Telemetry} = get_telemetry(State),
{reply, {ok, Telemetry}, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -186,10 +187,11 @@ handle_continue(Continue, State) ->
?SLOG(error, #{msg => "unexpected_continue", continue => Continue}), ?SLOG(error, #{msg => "unexpected_continue", continue => Continue}),
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef}) -> handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) ->
State =
case get_status() of case get_status() of
true -> report_telemetry(State); true -> report_telemetry(State0);
false -> ok false -> State0
end, end,
{noreply, ensure_report_timer(State)}; {noreply, ensure_report_timer(State)};
handle_info(Info, State) -> handle_info(Info, State) ->
@ -307,6 +309,9 @@ messages_sent() ->
messages_received() -> messages_received() ->
emqx_metrics:val('messages.received'). emqx_metrics:val('messages.received').
topic_count() ->
emqx_stats:getstat('topics.count').
generate_uuid() -> generate_uuid() ->
MicroSeconds = erlang:system_time(microsecond), MicroSeconds = erlang:system_time(microsecond),
Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET, Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET,
@ -323,9 +328,11 @@ generate_uuid() ->
) )
). ).
get_telemetry(#state{uuid = UUID}) -> -spec get_telemetry(state()) -> {state(), proplists:proplist()}.
get_telemetry(State0 = #state{uuid = UUID}) ->
OSInfo = os_info(), OSInfo = os_info(),
[ {MQTTRTInsights, State} = mqtt_runtime_insights(State0),
{State, [
{emqx_version, bin(emqx_app:get_release())}, {emqx_version, bin(emqx_app:get_release())},
{license, [{edition, <<"community">>}]}, {license, [{edition, <<"community">>}]},
{os_name, bin(get_value(os_name, OSInfo))}, {os_name, bin(get_value(os_name, OSInfo))},
@ -339,11 +346,12 @@ get_telemetry(#state{uuid = UUID}) ->
{messages_received, messages_received()}, {messages_received, messages_received()},
{messages_sent, messages_sent()}, {messages_sent, messages_sent()},
{build_info, build_info()}, {build_info, build_info()},
{vm_specs, vm_specs()} {vm_specs, vm_specs()},
]. {mqtt_runtime_insights, MQTTRTInsights}
]}.
report_telemetry(State = #state{url = URL}) -> report_telemetry(State0 = #state{url = URL}) ->
Data = get_telemetry(State), {State, Data} = get_telemetry(State0),
case emqx_json:safe_encode(Data) of case emqx_json:safe_encode(Data) of
{ok, Bin} -> {ok, Bin} ->
httpc_request(post, URL, [], Bin), httpc_request(post, URL, [], Bin),
@ -351,7 +359,8 @@ report_telemetry(State = #state{url = URL}) ->
{error, Reason} -> {error, Reason} ->
%% debug? why? %% debug? why?
?tp(debug, telemetry_data_encode_error, #{data => Data, reason => Reason}) ?tp(debug, telemetry_data_encode_error, #{data => Data, reason => Reason})
end. end,
State.
httpc_request(Method, URL, Headers, Body) -> httpc_request(Method, URL, Headers, Body) ->
HTTPOptions = [{timeout, 10_000}], HTTPOptions = [{timeout, 10_000}],
@ -401,9 +410,52 @@ vm_specs() ->
{total_memory, proplists:get_value(available_memory, SysMemData)} {total_memory, proplists:get_value(available_memory, SysMemData)}
]. ].
-spec mqtt_runtime_insights(state()) -> {map(), state()}.
mqtt_runtime_insights(State0) ->
{MQTTRates, State} = update_mqtt_rates(State0),
MQTTRTInsights = MQTTRates#{num_topics => topic_count()},
{MQTTRTInsights, State}.
-spec update_mqtt_rates(state()) -> {map(), state()}.
update_mqtt_rates(
State = #state{
previous_metrics = PrevMetrics0,
report_interval = ReportInterval
}
) when
is_integer(ReportInterval), ReportInterval > 0
->
MetricsToCheck =
[
{messages_sent_rate, messages_sent, fun messages_sent/0},
{messages_received_rate, messages_received, fun messages_received/0}
],
{Metrics, PrevMetrics} =
lists:foldl(
fun({RateKey, CountKey, Fun}, {Rates0, PrevMetrics1}) ->
NewCount = Fun(),
OldCount = maps:get(CountKey, PrevMetrics1, 0),
Rate = (NewCount - OldCount) / ReportInterval,
Rates = Rates0#{RateKey => Rate},
PrevMetrics2 = PrevMetrics1#{CountKey => NewCount},
{Rates, PrevMetrics2}
end,
{#{}, PrevMetrics0},
MetricsToCheck
),
{Metrics, State#state{previous_metrics = PrevMetrics}};
update_mqtt_rates(State) ->
{#{}, State}.
bin(L) when is_list(L) -> bin(L) when is_list(L) ->
list_to_binary(L); list_to_binary(L);
bin(A) when is_atom(A) -> bin(A) when is_atom(A) ->
atom_to_binary(A); atom_to_binary(A);
bin(B) when is_binary(B) -> bin(B) when is_binary(B) ->
B. B.
empty_state() ->
#state{
url = ?TELEMETRY_URL,
report_interval = timer:seconds(?REPORT_INTERVAL)
}.

View File

@ -134,6 +134,10 @@ t_get_telemetry(_Config) ->
?assert(0 =< get_value(num_cpus, VMSpecs)), ?assert(0 =< get_value(num_cpus, VMSpecs)),
?assert(is_integer(get_value(total_memory, VMSpecs))), ?assert(is_integer(get_value(total_memory, VMSpecs))),
?assert(0 =< get_value(total_memory, VMSpecs)), ?assert(0 =< get_value(total_memory, VMSpecs)),
MQTTRTInsights = get_value(mqtt_runtime_insights, TelemetryData),
?assert(is_number(maps:get(messages_sent_rate, MQTTRTInsights))),
?assert(is_number(maps:get(messages_received_rate, MQTTRTInsights))),
?assert(is_integer(maps:get(num_topics, MQTTRTInsights))),
ok. ok.
t_enable(_) -> t_enable(_) ->
@ -156,6 +160,31 @@ t_send_after_enable(_) ->
meck:unload([emqx_telemetry]) meck:unload([emqx_telemetry])
end. end.
t_mqtt_runtime_insights(_) ->
State0 = emqx_telemetry:empty_state(),
{MQTTRTInsights1, State1} = emqx_telemetry:mqtt_runtime_insights(State0),
?assertEqual(
#{
messages_sent_rate => 0.0,
messages_received_rate => 0.0,
num_topics => 0
},
MQTTRTInsights1
),
%% add some fake stats
emqx_metrics:set('messages.sent', 10_000_000_000),
emqx_metrics:set('messages.received', 20_000_000_000),
emqx_stats:setstat('topics.count', 30_000),
{MQTTRTInsights2, _State2} = emqx_telemetry:mqtt_runtime_insights(State1),
assert_approximate(MQTTRTInsights2, messages_sent_rate, "16.53"),
assert_approximate(MQTTRTInsights2, messages_received_rate, "33.07"),
?assertEqual(30_000, maps:get(num_topics, MQTTRTInsights2)),
ok.
assert_approximate(Map, Key, Expected) ->
Value = maps:get(Key, Map),
?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])).
bin(L) when is_list(L) -> bin(L) when is_list(L) ->
list_to_binary(L); list_to_binary(L);
bin(B) when is_binary(B) -> bin(B) when is_binary(B) ->