parent
5eadca1782
commit
78144c0ca5
|
@ -214,7 +214,7 @@ safe_publish(Msg) when is_record(Msg, message) ->
|
||||||
publish(Msg)
|
publish(Msg)
|
||||||
catch
|
catch
|
||||||
_:Error:Stk->
|
_:Error:Stk->
|
||||||
?LOG(error, "Publish error: ~p~n~s~n~p",
|
?LOG(error, "Publish error: ~0p~n~s~n~0p",
|
||||||
[Error, emqx_message:format(Msg), Stk])
|
[Error, emqx_message:format(Msg), Stk])
|
||||||
after
|
after
|
||||||
[]
|
[]
|
||||||
|
|
|
@ -276,7 +276,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
|
||||||
_:{noproc,_}:_Stk -> ok;
|
_:{noproc,_}:_Stk -> ok;
|
||||||
_:{{shutdown,_},_}:_Stk -> ok;
|
_:{{shutdown,_},_}:_Stk -> ok;
|
||||||
_:Error:_Stk ->
|
_:Error:_Stk ->
|
||||||
?LOG(error, "Failed to discard ~p: ~p", [ChanPid, Error])
|
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
|
||||||
end
|
end
|
||||||
end, ChanPids)
|
end, ChanPids)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -319,7 +319,7 @@ handle_msg({'$gen_call', From, Req}, State) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||||
?LOG(debug, "RECV ~p", [Data]),
|
?LOG(debug, "RECV ~0p", [Data]),
|
||||||
Oct = iolist_size(Data),
|
Oct = iolist_size(Data),
|
||||||
emqx_pd:inc_counter(incoming_bytes, Oct),
|
emqx_pd:inc_counter(incoming_bytes, Oct),
|
||||||
ok = emqx_metrics:inc('bytes.received', Oct),
|
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||||
|
@ -513,7 +513,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
parse_incoming(Rest, [Packet|Packets], NState)
|
parse_incoming(Rest, [Packet|Packets], NState)
|
||||||
catch
|
catch
|
||||||
error:Reason:Stk ->
|
error:Reason:Stk ->
|
||||||
?LOG(error, "~nParse failed for ~p~n~p~nFrame data:~p",
|
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
|
||||||
[Reason, Stk, Data]),
|
[Reason, Stk, Data]),
|
||||||
{[{frame_error, Reason}|Packets], State}
|
{[{frame_error, Reason}|Packets], State}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -106,7 +106,7 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
catch
|
catch
|
||||||
_:Reason:Stacktrace ->
|
_:Reason:Stacktrace ->
|
||||||
?ERROR("CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]),
|
?ERROR("CMD Error:~0p, Stacktrace:~0p", [Reason, Stacktrace]),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
[] ->
|
[] ->
|
||||||
|
|
|
@ -165,7 +165,7 @@ safe_execute(Fun, Args) ->
|
||||||
Result -> Result
|
Result -> Result
|
||||||
catch
|
catch
|
||||||
_:Reason:Stacktrace ->
|
_:Reason:Stacktrace ->
|
||||||
?LOG(error, "Failed to execute ~p: ~p", [Fun, {Reason, Stacktrace}]),
|
?LOG(error, "Failed to execute ~0p: ~0p", [Fun, {Reason, Stacktrace}]),
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ start_listener({Proto, ListenOn, Options}) ->
|
||||||
{ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n",
|
{ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n",
|
||||||
[Proto, format(ListenOn)]);
|
[Proto, format(ListenOn)]);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p~n!",
|
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!",
|
||||||
[Proto, format(ListenOn), Reason])
|
[Proto, format(ListenOn), Reason])
|
||||||
end,
|
end,
|
||||||
StartRet.
|
StartRet.
|
||||||
|
|
|
@ -101,6 +101,12 @@ rules_from_file(AclFile) ->
|
||||||
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
|
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
|
||||||
#{publish => [Rule || Rule <- Rules, filter(publish, Rule)],
|
#{publish => [Rule || Rule <- Rules, filter(publish, Rule)],
|
||||||
subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]};
|
subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]};
|
||||||
|
{error, eacces} ->
|
||||||
|
?LOG(alert, "Insufficient permissions to read the ~s file", [AclFile]),
|
||||||
|
#{};
|
||||||
|
{error, enoent} ->
|
||||||
|
?LOG(alert, "The ~s file does not exist", [AclFile]),
|
||||||
|
#{};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(alert, "Failed to read ~s: ~p", [AclFile, Reason]),
|
?LOG(alert, "Failed to read ~s: ~p", [AclFile, Reason]),
|
||||||
#{}
|
#{}
|
||||||
|
|
|
@ -442,7 +442,7 @@ format_variable(undefined, _) ->
|
||||||
format_variable(Variable, undefined) ->
|
format_variable(Variable, undefined) ->
|
||||||
format_variable(Variable);
|
format_variable(Variable);
|
||||||
format_variable(Variable, Payload) ->
|
format_variable(Variable, Payload) ->
|
||||||
io_lib:format("~s, Payload=~p", [format_variable(Variable), Payload]).
|
io_lib:format("~s, Payload=~0p", [format_variable(Variable), Payload]).
|
||||||
|
|
||||||
format_variable(#mqtt_packet_connect{
|
format_variable(#mqtt_packet_connect{
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
|
@ -460,7 +460,7 @@ format_variable(#mqtt_packet_connect{
|
||||||
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanStart=~s, KeepAlive=~p, Username=~s, Password=~s",
|
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanStart=~s, KeepAlive=~p, Username=~s, Password=~s",
|
||||||
Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)],
|
Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)],
|
||||||
{Format1, Args1} = if
|
{Format1, Args1} = if
|
||||||
WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~s, Payload=~p)",
|
WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~s, Payload=~0p)",
|
||||||
Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]};
|
Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]};
|
||||||
true -> {Format, Args}
|
true -> {Format, Args}
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -283,7 +283,7 @@ start_app(App, SuccFun) ->
|
||||||
SuccFun(App),
|
SuccFun(App),
|
||||||
ok;
|
ok;
|
||||||
{error, {ErrApp, Reason}} ->
|
{error, {ErrApp, Reason}} ->
|
||||||
?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]),
|
?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]),
|
||||||
{error, {ErrApp, Reason}}
|
{error, {ErrApp, Reason}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -107,7 +107,7 @@ handle_call(Req, _From, State) ->
|
||||||
handle_cast({async_submit, Task}, State) ->
|
handle_cast({async_submit, Task}, State) ->
|
||||||
try run(Task)
|
try run(Task)
|
||||||
catch _:Error:Stacktrace ->
|
catch _:Error:Stacktrace ->
|
||||||
?LOG(error, "Error: ~p, ~p", [Error, Stacktrace])
|
?LOG(error, "Error: ~0p, ~0p", [Error, Stacktrace])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,6 @@ lookup(psk, ClientPSKID, _UserState) ->
|
||||||
error
|
error
|
||||||
catch
|
catch
|
||||||
Except:Error:Stacktrace ->
|
Except:Error:Stacktrace ->
|
||||||
?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),
|
?LOG(error, "Lookup PSK failed, ~0p: ~0p", [{Except,Error}, Stacktrace]),
|
||||||
error
|
error
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -243,7 +243,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
|
||||||
try UpFun()
|
try UpFun()
|
||||||
catch
|
catch
|
||||||
_:Error ->
|
_:Error ->
|
||||||
?LOG(error, "Update ~s failed: ~p", [Name, Error])
|
?LOG(error, "Update ~s failed: ~0p", [Name, Error])
|
||||||
end,
|
end,
|
||||||
[Update#update{countdown = I} | Acc];
|
[Update#update{countdown = I} | Acc];
|
||||||
(Update = #update{countdown = C}, Acc) ->
|
(Update = #update{countdown = C}, Acc) ->
|
||||||
|
@ -272,6 +272,6 @@ safe_update_element(Key, Val) ->
|
||||||
true -> true
|
true -> true
|
||||||
catch
|
catch
|
||||||
error:badarg ->
|
error:badarg ->
|
||||||
?LOG(warning, "Failed to update ~p to ~p", [Key, Val])
|
?LOG(warning, "Failed to update ~0p to ~0p", [Key, Val])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
|
||||||
ignore;
|
ignore;
|
||||||
trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
||||||
when is_binary(From); is_atom(From) ->
|
when is_binary(From); is_atom(From) ->
|
||||||
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~p", [Topic, Payload]).
|
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~0p", [Topic, Payload]).
|
||||||
|
|
||||||
%% @doc Start to trace clientid or topic.
|
%% @doc Start to trace clientid or topic.
|
||||||
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
||||||
|
|
|
@ -200,7 +200,7 @@ websocket_init([Req, Opts]) ->
|
||||||
?LOG(error, "Illegal cookie"),
|
?LOG(error, "Illegal cookie"),
|
||||||
undefined;
|
undefined;
|
||||||
Error:Reason ->
|
Error:Reason ->
|
||||||
?LOG(error, "Failed to parse cookie, Error: ~p, Reason ~p",
|
?LOG(error, "Failed to parse cookie, Error: ~0p, Reason ~0p",
|
||||||
[Error, Reason]),
|
[Error, Reason]),
|
||||||
undefined
|
undefined
|
||||||
end,
|
end,
|
||||||
|
@ -245,7 +245,7 @@ websocket_handle({binary, Data}, State) when is_list(Data) ->
|
||||||
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
||||||
|
|
||||||
websocket_handle({binary, Data}, State) ->
|
websocket_handle({binary, Data}, State) ->
|
||||||
?LOG(debug, "RECV ~p", [Data]),
|
?LOG(debug, "RECV ~0p", [Data]),
|
||||||
ok = inc_recv_stats(1, iolist_size(Data)),
|
ok = inc_recv_stats(1, iolist_size(Data)),
|
||||||
NState = ensure_stats_timer(State),
|
NState = ensure_stats_timer(State),
|
||||||
return(parse_incoming(Data, NState));
|
return(parse_incoming(Data, NState));
|
||||||
|
@ -458,7 +458,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
|
||||||
parse_incoming(Rest, postpone({incoming, Packet}, NState))
|
parse_incoming(Rest, postpone({incoming, Packet}, NState))
|
||||||
catch
|
catch
|
||||||
error:Reason:Stk ->
|
error:Reason:Stk ->
|
||||||
?LOG(error, "~nParse failed for ~p~n~p~nFrame data: ~p",
|
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p",
|
||||||
[Reason, Stk, Data]),
|
[Reason, Stk, Data]),
|
||||||
FrameError = {frame_error, Reason},
|
FrameError = {frame_error, Reason},
|
||||||
postpone({incoming, FrameError}, State)
|
postpone({incoming, FrameError}, State)
|
||||||
|
|
Loading…
Reference in New Issue