fix(telemetry): wait for emqx to start before sending first report
This commit is contained in:
parent
97f2e5d544
commit
ad630f49ef
|
@ -21,7 +21,8 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-logger_header("[Telemetry]").
|
-logger_header("[Telemetry]").
|
||||||
|
|
||||||
|
@ -159,7 +160,8 @@ init([Opts]) ->
|
||||||
end,
|
end,
|
||||||
case official_version(emqx_app:get_release()) of
|
case official_version(emqx_app:get_release()) of
|
||||||
true ->
|
true ->
|
||||||
{ok, ensure_report_timer(NState), {continue, first_report}};
|
_ = erlang:send(self(), first_report),
|
||||||
|
{ok, NState};
|
||||||
false ->
|
false ->
|
||||||
{ok, NState#state{enabled = false}}
|
{ok, NState#state{enabled = false}}
|
||||||
end.
|
end.
|
||||||
|
@ -168,7 +170,8 @@ handle_call(enable, _From, State = #state{uuid = UUID}) ->
|
||||||
mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
|
mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
|
||||||
uuid = UUID,
|
uuid = UUID,
|
||||||
enabled = true}),
|
enabled = true}),
|
||||||
{reply, ok, ensure_report_timer(State#state{enabled = true})};
|
_ = erlang:send(self(), first_report),
|
||||||
|
{reply, ok, State#state{enabled = true}};
|
||||||
|
|
||||||
handle_call(disable, _From, State = #state{uuid = UUID}) ->
|
handle_call(disable, _From, State = #state{uuid = UUID}) ->
|
||||||
mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
|
mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
|
||||||
|
@ -193,14 +196,19 @@ handle_cast(Msg, State) ->
|
||||||
?LOG(error, "Unexpected msg: ~p", [Msg]),
|
?LOG(error, "Unexpected msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_continue(first_report, State) ->
|
|
||||||
report_telemetry(State),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_continue(Continue, State) ->
|
handle_continue(Continue, State) ->
|
||||||
?LOG(error, "Unexpected continue: ~p", [Continue]),
|
?LOG(error, "Unexpected continue: ~p", [Continue]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(first_report, State) ->
|
||||||
|
case is_pid(erlang:whereis(emqx)) of
|
||||||
|
true ->
|
||||||
|
report_telemetry(State),
|
||||||
|
{noreply, ensure_report_timer(State)};
|
||||||
|
false ->
|
||||||
|
_ = erlang:send_after(1000, self(), first_report),
|
||||||
|
{noreply, State}
|
||||||
|
end;
|
||||||
handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef,
|
handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef,
|
||||||
enabled = false}) ->
|
enabled = false}) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
@ -223,12 +231,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
official_version(Version) ->
|
official_version(Version) ->
|
||||||
case re:run(Version,
|
Pt = "^\\d+\\.\\d+(?:(-(?:alpha|beta|rc)\\.[1-9][0-9]*)|\\.\\d+)$",
|
||||||
"^\\d+\\.\\d+(?:(-(?:alpha|beta|rc)\\.[1-9][0-9]*)|\\.\\d+)$",
|
match =:= re:run(Version, Pt, [{capture, none}]).
|
||||||
[{capture, none}]) of
|
|
||||||
match -> true;
|
|
||||||
nomatch -> false
|
|
||||||
end.
|
|
||||||
|
|
||||||
ensure_report_timer(State = #state{report_interval = ReportInterval}) ->
|
ensure_report_timer(State = #state{report_interval = ReportInterval}) ->
|
||||||
State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}.
|
State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}.
|
||||||
|
@ -361,9 +365,11 @@ report_telemetry(State = #state{url = URL}) ->
|
||||||
Data = get_telemetry(State),
|
Data = get_telemetry(State),
|
||||||
case emqx_json:safe_encode(Data) of
|
case emqx_json:safe_encode(Data) of
|
||||||
{ok, Bin} ->
|
{ok, Bin} ->
|
||||||
httpc_request(post, URL, [], Bin);
|
httpc_request(post, URL, [], Bin),
|
||||||
|
?tp(debug, telemetry_data_reported, #{});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(debug, "Encode ~p failed due to ~p", [Data, Reason])
|
%% debug? why?
|
||||||
|
?tp(debug, telemetry_data_encode_error, #{data => Data, reason => Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
httpc_request(Method, URL, Headers, Body) ->
|
httpc_request(Method, URL, Headers, Body) ->
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-import(proplists, [get_value/2]).
|
-import(proplists, [get_value/2]).
|
||||||
|
|
||||||
|
@ -74,6 +75,16 @@ t_enable(_) ->
|
||||||
ok = emqx_telemetry:disable(),
|
ok = emqx_telemetry:disable(),
|
||||||
?assertEqual(false, emqx_telemetry:is_enabled()).
|
?assertEqual(false, emqx_telemetry:is_enabled()).
|
||||||
|
|
||||||
|
t_send_after_enable(_) ->
|
||||||
|
ok = emqx_telemetry:disable(),
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
|
try
|
||||||
|
ok = emqx_telemetry:enable(),
|
||||||
|
?assertMatch({ok, _}, ?block_until(#{?snk_kind := telemetry_data_reported}, 2000, 100))
|
||||||
|
after
|
||||||
|
ok = snabbkaffe:stop()
|
||||||
|
end.
|
||||||
|
|
||||||
bin(L) when is_list(L) ->
|
bin(L) when is_list(L) ->
|
||||||
list_to_binary(L);
|
list_to_binary(L);
|
||||||
bin(B) when is_binary(B) ->
|
bin(B) when is_binary(B) ->
|
||||||
|
|
Loading…
Reference in New Issue