Merge remote-tracking branch 'origin/develop'
This commit is contained in:
commit
cc43da0fd5
|
@ -323,10 +323,9 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout,
|
||||||
{ok, ConnRef, Conn} ->
|
{ok, ConnRef, Conn} ->
|
||||||
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
|
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
|
||||||
Action = {state_timeout, 0, connected},
|
Action = {state_timeout, 0, connected},
|
||||||
{keep_state,
|
State0 = State#{conn_ref => ConnRef, connection => Conn},
|
||||||
eval_bridge_handler(State#{ conn_ref => ConnRef
|
State1 = eval_bridge_handler(State0, connected),
|
||||||
, connection => Conn}, connected),
|
{keep_state, State1, Action};
|
||||||
Action};
|
|
||||||
error ->
|
error ->
|
||||||
Action = {state_timeout, Timeout, reconnect},
|
Action = {state_timeout, Timeout, reconnect},
|
||||||
{keep_state_and_data, Action}
|
{keep_state_and_data, Action}
|
||||||
|
@ -424,7 +423,7 @@ common(StateName, Type, Content, State) ->
|
||||||
eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
|
eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
|
||||||
State;
|
State;
|
||||||
eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
|
eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
|
||||||
_ = Handler(Msg),
|
Handler(Msg),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
ensure_present(Key, Topic, State) ->
|
ensure_present(Key, Topic, State) ->
|
||||||
|
@ -564,9 +563,8 @@ disconnect(#{connection := Conn,
|
||||||
connect_module := Module
|
connect_module := Module
|
||||||
} = State) when Conn =/= undefined ->
|
} = State) when Conn =/= undefined ->
|
||||||
ok = Module:stop(ConnRef, Conn),
|
ok = Module:stop(ConnRef, Conn),
|
||||||
eval_bridge_handler(State#{conn_ref => undefined,
|
State0 = State#{conn_ref => undefined, connection => undefined},
|
||||||
connection => undefined},
|
eval_bridge_handler(State0, disconnected);
|
||||||
disconnected);
|
|
||||||
disconnect(State) ->
|
disconnect(State) ->
|
||||||
eval_bridge_handler(State, disconnected).
|
eval_bridge_handler(State, disconnected).
|
||||||
|
|
||||||
|
|
|
@ -928,6 +928,11 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
|
||||||
?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
|
?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
|
||||||
{stop, {shutdown, Reason}, State};
|
{stop, {shutdown, Reason}, State};
|
||||||
|
|
||||||
|
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
|
||||||
|
?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)",
|
||||||
|
[StateName, EventContent]),
|
||||||
|
keep_state_and_data;
|
||||||
|
|
||||||
handle_event(EventType, EventContent, StateName, _StateData) ->
|
handle_event(EventType, EventContent, StateName, _StateData) ->
|
||||||
?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
|
?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
|
||||||
[StateName, EventType, EventContent]),
|
[StateName, EventType, EventContent]),
|
||||||
|
|
|
@ -676,6 +676,7 @@ terminate(Reason, #state{will_msg = WillMsg,
|
||||||
username = Username,
|
username = Username,
|
||||||
conn_pid = ConnPid,
|
conn_pid = ConnPid,
|
||||||
old_conn_pid = OldConnPid}) ->
|
old_conn_pid = OldConnPid}) ->
|
||||||
|
emqx_metrics:commit(),
|
||||||
send_willmsg(WillMsg),
|
send_willmsg(WillMsg),
|
||||||
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
|
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
|
||||||
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
|
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
|
||||||
|
|
|
@ -76,11 +76,21 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
||||||
%% @doc Start to trace client_id or topic.
|
%% @doc Start to trace client_id or topic.
|
||||||
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
||||||
start_trace({client_id, ClientId}, Level, LogFile) ->
|
start_trace({client_id, ClientId}, Level, LogFile) ->
|
||||||
start_trace({start_trace, {client_id, ClientId}, Level, LogFile});
|
do_start_trace({client_id, ClientId}, Level, LogFile);
|
||||||
start_trace({topic, Topic}, Level, LogFile) ->
|
start_trace({topic, Topic}, Level, LogFile) ->
|
||||||
start_trace({start_trace, {topic, Topic}, Level, LogFile}).
|
do_start_trace({topic, Topic}, Level, LogFile).
|
||||||
|
|
||||||
start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
|
do_start_trace(Who, Level, LogFile) ->
|
||||||
|
#{level := PrimaryLevel} = logger:get_primary_config(),
|
||||||
|
try logger:compare_levels(log_level(Level), PrimaryLevel) of
|
||||||
|
lt ->
|
||||||
|
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
|
||||||
|
_GtOrEq ->
|
||||||
|
gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000)
|
||||||
|
catch
|
||||||
|
_:Error ->
|
||||||
|
{error, Error}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc Stop tracing client_id or topic.
|
%% @doc Stop tracing client_id or topic.
|
||||||
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
||||||
|
@ -109,7 +119,7 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T
|
||||||
config => #{type => halt, file => LogFile},
|
config => #{type => halt, file => LogFile},
|
||||||
filter_default => stop,
|
filter_default => stop,
|
||||||
filters => [{meta_key_filter,
|
filters => [{meta_key_filter,
|
||||||
{fun filter_by_meta_key/2, Who} }]}) of
|
{fun filter_by_meta_key/2, Who} }]}) of
|
||||||
ok ->
|
ok ->
|
||||||
?LOG(info, "[Tracer] Start trace for ~p", [Who]),
|
?LOG(info, "[Tracer] Start trace for ~p", [Who]),
|
||||||
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
|
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
|
||||||
|
@ -168,3 +178,14 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
||||||
end;
|
end;
|
||||||
_ -> ignore
|
_ -> ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
log_level(emergency) -> emergency;
|
||||||
|
log_level(alert) -> alert;
|
||||||
|
log_level(critical) -> critical;
|
||||||
|
log_level(error) -> error;
|
||||||
|
log_level(warning) -> warning;
|
||||||
|
log_level(notice) -> notice;
|
||||||
|
log_level(info) -> info;
|
||||||
|
log_level(debug) -> debug;
|
||||||
|
log_level(all) -> debug;
|
||||||
|
log_level(_) -> throw(invalid_log_level).
|
||||||
|
|
|
@ -38,8 +38,12 @@ start_traces(_Config) ->
|
||||||
emqx_client:connect(T),
|
emqx_client:connect(T),
|
||||||
|
|
||||||
%% Start tracing
|
%% Start tracing
|
||||||
|
emqx_logger:set_log_level(error),
|
||||||
|
{error, _} = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
||||||
|
emqx_logger:set_log_level(debug),
|
||||||
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
||||||
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
|
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
|
||||||
|
{error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
|
||||||
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
|
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
|
||||||
ct:sleep(100),
|
ct:sleep(100),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue