diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index c998a1401..a930ade17 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -476,7 +476,7 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({sock_error, Reason}, State); handle_msg({close, Reason}, State) -> - ?LOG(debug, "Force to close the socket due to ~p", [Reason]), + ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); handle_msg({event, connected}, State = #state{ @@ -525,7 +525,7 @@ handle_msg(Msg, State) -> terminate(Reason, State = #state{ chann_mod = ChannMod, channel = Channel}) -> - ?LOG(debug, "Terminated due to ~p", [Reason]), + ?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}), _ = ChannMod:terminate(Reason, Channel), _ = close_socket(State), exit(Reason). @@ -620,7 +620,7 @@ handle_timeout(TRef, Msg, State) -> parse_incoming(Data, State = #state{ chann_mod = ChannMod, channel = Channel}) -> - ?LOG(debug, "RECV ~0p", [Data]), + ?SLOG(debug, #{msg => "RECV_data", data => Data}), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), Ctx = ChannMod:info(ctx, Channel), @@ -643,8 +643,12 @@ parse_incoming(Data, Packets, parse_incoming(Rest, [Packet|Packets], NState) catch error:Reason:Stk -> - ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", - [Reason, Stk, Data]), + ?SLOG(error, #{ msg => "parse_frame_failed" + , at_state => ParseState + , input_bytes => Data + , reason => Reason + , stacktrace => Stk + }), {[{frame_error, Reason}|Packets], State} end. @@ -663,7 +667,9 @@ handle_incoming(Packet, State = #state{ }) -> Ctx = ChannMod:info(ctx, Channel), ok = inc_incoming_stats(Ctx, FrameMod, Packet), - ?LOG(debug, "RECV ~ts", [FrameMod:format(Packet)]), + ?SLOG(debug, #{ msg => "RECV_packet" + , packet => FrameMod:format(Packet) + }), with_channel(handle_in, [Packet], State). %%-------------------------------------------------------------------- @@ -715,14 +721,19 @@ serialize_and_inc_stats_fun(#state{ Ctx = ChannMod:info(ctx, Channel), fun(Packet) -> case FrameMod:serialize_pkt(Packet, Serialize) of - <<>> -> ?LOG(warning, "~ts is discarded due to the frame is too large!", - [FrameMod:format(Packet)]), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), - ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), - <<>>; - Data -> ?LOG(debug, "SEND ~ts", [FrameMod:format(Packet)]), - ok = inc_outgoing_stats(Ctx, FrameMod, Packet), - Data + <<>> -> + ?SLOG(warning, #{ msg => "packet_too_large_discarded" + , packet => FrameMod:format(Packet) + }), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), + <<>>; + Data -> + ?SLOG(debug, #{ msg => "SEND_packet" + , packet => FrameMod:format(Packet) + }), + ok = inc_outgoing_stats(Ctx, FrameMod, Packet), + Data end end. @@ -760,7 +771,9 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> end; handle_info({sock_error, Reason}, State) -> - ?LOG(debug, "Socket error: ~p", [Reason]), + ?SLOG(debug, #{ msg => "sock_error" + , reason => Reason + }), handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State) -> @@ -775,7 +788,10 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> {ok, Limiter1} -> State#state{limiter = Limiter1}; {pause, Time, Limiter1} -> - ?LOG(warning, "Pause ~pms due to rate limit", [Time]), + %% XXX: which limiter reached? + ?SLOG(warning, #{ msg => "reach_rate_limit" + , pause => Time + }), TRef = emqx_misc:start_timer(Time, limit_timeout), State#state{sockstate = blocked, limiter = Limiter1, diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index bc34b5e5b..839567d1e 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -189,14 +189,14 @@ handle_call({send_request, Msg}, From, Channel) -> erlang:setelement(1, Result, noreply); handle_call(Req, _From, Channel) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, Channel}. %%-------------------------------------------------------------------- %% Handle Cast %%-------------------------------------------------------------------- handle_cast(Req, Channel) -> - ?LOG(error, "Unexpected cast: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {ok, Channel}. %%-------------------------------------------------------------------- @@ -206,7 +206,7 @@ handle_info({subscribe, _}, Channel) -> {ok, Channel}; handle_info(Info, Channel) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. %%-------------------------------------------------------------------- @@ -331,8 +331,11 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx, {ok, NClientInfo} -> {ok, Channel#channel{clientinfo = NClientInfo}}; {error, Reason} -> - ?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p", - [ClientId, Username, Reason]), + ?SLOG(warning, #{ msg => "client_login_failed" + , username => Username + , clientid => ClientId + , reason => Reason + }), {error, Reason} end. @@ -375,7 +378,10 @@ process_connect(#channel{ctx = Ctx, reply({ok, created}, Token, Msg, Result), Channel#channel{token = Token}); {error, Reason} -> - ?LOG(error, "Failed to open session du to ~p", [Reason]), + ?SLOG(error, #{ msg => "failed_open_session" + , clientid => maps:get(clientid, ClientInfo) + , reason => Reason + }), iter(Iter, reply({error, bad_request}, Msg, Result), Channel) end. diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 71191c773..480e259e9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -110,8 +110,9 @@ clients_insta(get, #{ bindings := #{name := Name0, [ClientInfo] -> {200, ClientInfo}; [ClientInfo | _More] -> - ?LOG(warning, "More than one client info was returned on ~ts", - [ClientId]), + ?SLOG(warning, #{ msg => "more_than_one_channel_found" + , clientid => ClientId + }), {200, ClientInfo}; [] -> return_http_error(404, "Client not found") diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index 013bb35c9..8b09f18a0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -40,7 +40,6 @@ stop(_State) -> load_default_gateway_applications() -> Apps = gateway_type_searching(), - ?LOG(info, "Starting the default gateway types: ~p", [Apps]), lists:foreach(fun reg/1, Apps). gateway_type_searching() -> @@ -51,12 +50,16 @@ gateway_type_searching() -> reg(Mod) -> try Mod:reg(), - ?LOG(info, "Register ~ts gateway application successfully!", [Mod]) + ?SLOG(debug, #{ msg => "register_gateway_succeed" + , callback_module => Mod + }) catch Class : Reason : Stk -> - ?LOG(error, "Failed to register ~ts gateway application: {~p, ~p}\n" - "Stacktrace: ~0p", - [Mod, Class, Reason, Stk]) + ?SLOG(error, #{ msg => "failed_to_register_gateway" + , callback_module => Mod + , reason => {Class, Reason} + , stacktrace => Stk + }) end. load_gateway_by_default() -> @@ -67,14 +70,19 @@ load_gateway_by_default([]) -> load_gateway_by_default([{Type, Confs}|More]) -> case emqx_gateway_registry:lookup(Type) of undefined -> - ?LOG(error, "Skip to load ~ts gateway, because it is not registered", - [Type]); + ?SLOG(error, #{ msg => "skip_to_load_gateway" + , gateway_name => Type + }); _ -> case emqx_gateway:load(Type, Confs) of {ok, _} -> - ?LOG(debug, "Load ~ts gateway successfully!", [Type]); + ?SLOG(debug, #{ msg => "load_gateway_succeed" + , gateway_name => Type + }); {error, Reason} -> - ?LOG(error, "Failed to load ~ts gateway: ~0p", [Type, Reason]) + ?SLOG(error, #{ msg => "load_gateway_failed" + , gateway_name => Type + , reason => Reason}) end end, load_gateway_by_default(More). diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index d8b615fe8..99fa43720 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -282,8 +282,12 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> Session catch Class : Reason : Stk -> - ?LOG(error, "Failed to create a session: ~p, ~p " - "Stacktrace:~0p", [Class, Reason, Stk]), + ?SLOG(error, #{ msg => "failed_create_session" + , clientid => maps:get(clientid, ClientInfo, undefined) + , username => maps:get(username, ClientInfo, undefined) + , reason => {Class, Reason} + , stacktrace => Stk + }), throw(Reason) end. @@ -337,7 +341,9 @@ kick_session(GwName, ClientId) -> kick_session(GwName, ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), + ?SLOG(error, #{ msg => "more_than_one_channel_found" + , chan_pids => ChanPids + }), lists:foreach(fun(StalePid) -> catch discard_session(GwName, ClientId, StalePid) end, StalePids), diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index aab99ac13..07a5ae3c4 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -336,8 +336,10 @@ with_gateway(GwName0, Fun) -> error : {update_conf_error, already_exist} -> return_http_error(400, "Resource already exist"); Class : Reason : Stk -> - ?LOG(error, "Uncatched error: {~p, ~p}, stacktrace: ~0p", - [Class, Reason, Stk]), + ?SLOG(error, #{ msg => "uncatched_error" + , reason => {Class, Reason} + , stacktrace => Stk + }), return_http_error(500, {Class, Reason, Stk}) end. diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index bb928498c..52c23d459 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -103,7 +103,9 @@ init([Gateway, Ctx, _GwDscrptr]) -> }, case maps:get(enable, Config, true) of false -> - ?LOG(info, "Skipp to start ~ts gateway due to disabled", [GwName]), + ?SLOG(info, #{ msg => "skip_to_start_gateway_due_to_disabled" + , gateway_name => GwName + }), {ok, State}; true -> case cb_gateway_load(State) of @@ -160,13 +162,19 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) -> +handle_info({'EXIT', Pid, Reason}, State = #state{name = Name, + child_pids = Pids}) -> case lists:member(Pid, Pids) of true -> - ?LOG(error, "Child process ~p exited: ~0p.", [Pid, Reason]), + ?SLOG(error, #{ msg => "child_process_exited" + , child => Pid + , reason => Reason + }), case Pids -- [Pid]of [] -> - ?LOG(error, "All child process exited!"), + ?SLOG(error, #{ msg => "gateway_all_children_process_existed" + , gateway_name => Name + }), {noreply, State#state{status = stopped, child_pids = [], gw_state = undefined}}; @@ -174,12 +182,18 @@ handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) -> {noreply, State#state{child_pids = RemainPids}} end; _ -> - ?LOG(error, "Unknown process exited ~p:~0p", [Pid, Reason]), + ?SLOG(error, #{ msg => "gateway_catch_a_unknown_process_exited" + , child => Pid + , reason => Reason + , gateway_name => Name + }), {noreply, State} end; handle_info(Info, State) -> - ?LOG(warning, "Unexcepted info: ~p", [Info]), + ?SLOG(warning, #{ msg => "unexcepted_info" + , info => Info + }), {noreply, State}. terminate(_Reason, State = #state{child_pids = Pids}) -> @@ -266,14 +280,18 @@ do_create_authn_chain(ChainName, AuthConf) -> case emqx_authentication:create_authenticator(ChainName, AuthConf) of {ok, _} -> ok; {error, Reason} -> - ?LOG(error, "Failed to create authenticator chain ~ts, " - "reason: ~p, config: ~p", - [ChainName, Reason, AuthConf]), + ?SLOG(error, #{ msg => "failed_to_create_authenticator" + , chain_name => ChainName + , reason => Reason + , config => AuthConf + }), throw({badauth, Reason}) end; {error, Reason} -> - ?LOG(error, "Falied to create authn chain ~ts, reason ~p", - [ChainName, Reason]), + ?SLOG(error, #{ msg => "failed_to_create_authn_chanin" + , chain_name => ChainName + , reason => Reason + }), throw({badauth, Reason}) end. @@ -293,8 +311,10 @@ do_deinit_authn(Names) -> ok -> ok; {error, {not_found, _}} -> ok; {error, Reason} -> - ?LOG(error, "Failed to clean authentication chain: ~ts, " - "reason: ~p", [ChainName, Reason]) + ?SLOG(error, #{ msg => "failed_to_clean_authn_chain" + , chain_name => ChainName + , reason => Reason + }) end end, Names). @@ -348,10 +368,12 @@ cb_gateway_unload(State = #state{name = GwName, stopped_at = erlang:system_time(millisecond)}} catch Class : Reason : Stk -> - ?LOG(error, "Failed to unload gateway (~0p, ~0p) crashed: " - "{~p, ~p}, stacktrace: ~0p", - [GwName, GwState, - Class, Reason, Stk]), + ?SLOG(error, #{ msg => "unload_gateway_crashed" + , gateway_name => GwName + , inner_state => GwState + , reason => {Class, Reason} + , stacktrace => Stk + }), {error, {Class, Reason, Stk}} after _ = do_deinit_authn(State#state.authns) @@ -388,10 +410,13 @@ cb_gateway_load(State = #state{name = GwName, end catch Class : Reason1 : Stk -> - ?LOG(error, "Failed to load ~ts gateway (~0p, ~0p) " - "crashed: {~p, ~p}, stacktrace: ~0p", - [GwName, Gateway, Ctx, - Class, Reason1, Stk]), + ?SLOG(error, #{ msg => "load_gateway_crashed" + , gateway_name => GwName + , gateway => Gateway + , ctx => Ctx + , reason => {Class, Reason1} + , stacktrace => Stk + }), {error, {Class, Reason1, Stk}} end. @@ -413,9 +438,12 @@ cb_gateway_update(Config, end catch Class : Reason1 : Stk -> - ?LOG(error, "Failed to update ~ts gateway to config: ~0p crashed: " - "{~p, ~p}, stacktrace: ~0p", - [GwName, Config, Class, Reason1, Stk]), + ?SLOG(error, #{ msg => "update_gateway_crashed" + , gateway_name => GwName + , new_config => Config + , reason => {Class, Reason1} + , stacktrace => Stk + }), {error, {Class, Reason1, Stk}} end. diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 49fe6c652..58b497cfc 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -263,7 +263,9 @@ handle_call(close, _From, Channel) -> handle_call({auth, ClientInfo, _Password}, _From, Channel = #channel{conn_state = connected}) -> - ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), + ?SLOG(warning, #{ msg => "ingore_duplicated_authorized_command" + , request_clientinfo => ClientInfo + }), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; handle_call({auth, ClientInfo0, Password}, _From, Channel = #channel{ @@ -291,18 +293,25 @@ handle_call({auth, ClientInfo0, Password}, _From, SessFun ) of {ok, _Session} -> - ?LOG(debug, "Client ~ts (Username: '~ts') authorized successfully!", - [ClientId, Username]), + ?SLOG(debug, #{ msg => "client_login_succeed" + , clientid => ClientId + , username => Username + }), {reply, ok, [{event, connected}], ensure_connected(Channel1#channel{clientinfo = NClientInfo})}; {error, Reason} -> - ?LOG(warning, "Client ~ts (Username: '~ts') open session failed for ~0p", - [ClientId, Username, Reason]), + ?SLOG(warning, #{ msg => "client_login_failed" + , clientid => ClientId + , username => Username + , reason => Reason + }), {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel} end; {error, Reason} -> - ?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p", - [ClientId, Username, Reason]), + ?SLOG(warning, #{ msg => "client_login_failed" + , clientid => ClientId + , username => Username + , reason => Reason}), {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel} end; @@ -363,7 +372,9 @@ handle_call(kick, _From, Channel) -> {shutdown, kicked, ok, Channel}; handle_call(Req, _From, Channel) -> - ?LOG(warning, "Unexpected call: ~p", [Req]), + ?SLOG(warning, #{ msg => "unexpected_call" + , call => Req + }), {reply, {error, unexpected_call}, Channel}. -spec handle_cast(any(), channel()) @@ -371,7 +382,9 @@ handle_call(Req, _From, Channel) -> | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()}. handle_cast(Req, Channel) -> - ?WARN("Unexpected call: ~p", [Req]), + ?SLOG(warning, #{ msg => "unexpected_call" + , call => Req + }), {ok, Channel}. -spec handle_info(any(), channel()) @@ -403,7 +416,9 @@ handle_info({hreply, FunName, {error, Reason}}, Channel) -> {shutdown, {error, {FunName, Reason}}, Channel}; handle_info(Info, Channel) -> - ?LOG(warning, "Unexpected info: ~p", [Info]), + ?SLOG(warning, #{ msg => "unexpected_info" + , info => Info + }), {ok, Channel}. -spec terminate(any(), channel()) -> channel(). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl index 82adc8fb3..30b3c5922 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl @@ -82,32 +82,42 @@ handle_call(_Request, _From, State) -> handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) -> case ensure_stream_opened(Fun, Options, Streams) of {error, Reason} -> - ?LOG(error, "CALL ~0p:~0p(~0p) failed, reason: ~0p", - [?CONN_ADAPTER_MOD, Fun, Options, Reason]), + ?SLOG(error, #{ msg => "request_grpc_server_failed" + , function => {?CONN_ADAPTER_MOD, Fun, Options} + , reason => Reason}), reply(From, Fun, {error, Reason}), {noreply, State#state{streams = Streams#{Fun => undefined}}}; {ok, Stream} -> case catch grpc_client:send(Stream, Req) of ok -> - ?LOG(debug, "Send to ~p method successfully, request: ~0p", - [Fun, Req]), + ?SLOG(debug, #{ msg => "send_grpc_request_succeed" + , function => {?CONN_ADAPTER_MOD, Fun} + , request => Req + }), reply(From, Fun, ok), {noreply, State#state{streams = Streams#{Fun => Stream}}}; {'EXIT', {not_found, _Stk}} -> %% Not found the stream, reopen it - ?LOG(info, "Can not find the old stream ref for ~s; " - "re-try with a new stream!", [Fun]), + ?SLOG(info, #{ msg => "cannt_find_old_stream_ref" + , function => {?CONN_ADAPTER_MOD, Fun} + }), handle_cast( {rpc, Fun, Req, Options, From}, State#state{streams = maps:remove(Fun, Streams)}); {'EXIT', {timeout, _Stk}} -> - ?LOG(error, "Send to ~p method timeout, request: ~0p", - [Fun, Req]), + ?SLOG(error, #{ msg => "send_grpc_request_timeout" + , function => {?CONN_ADAPTER_MOD, Fun} + , request => Req + }), reply(From, Fun, {error, timeout}), {noreply, State#state{streams = Streams#{Fun => Stream}}}; - {'EXIT', {Reason1, _Stk}} -> - ?LOG(error, "Send to ~p method failure, request: ~0p, " - "stacktrace: ~0p", [Fun, Req, _Stk]), + {'EXIT', {Reason1, Stk}} -> + ?SLOG(error, #{ msg => "send_grpc_request_failed" + , function => {?CONN_ADAPTER_MOD, Fun} + , request => Req + , error => Reason1 + , stacktrace => Stk + }), reply(From, Fun, {error, Reason1}), {noreply, State#state{streams = Streams#{Fun => undefined}}} end diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl index 04f3dea1c..3832f67d8 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl @@ -44,14 +44,20 @@ -> {ok, emqx_exproto_pb:code_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. send(Req = #{conn := Conn, bytes := Bytes}, Md) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), {ok, response(call(Conn, {send, Bytes})), Md}. -spec close(emqx_exproto_pb:close_socket_request(), grpc:metadata()) -> {ok, emqx_exproto_pb:code_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. close(Req = #{conn := Conn}, Md) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), {ok, response(call(Conn, close)), Md}. -spec authenticate(emqx_exproto_pb:authenticate_request(), grpc:metadata()) @@ -60,7 +66,10 @@ close(Req = #{conn := Conn}, Md) -> authenticate(Req = #{conn := Conn, password := Password, clientinfo := ClientInfo}, Md) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), case validate(clientinfo, ClientInfo) of false -> {ok, response({error, ?RESP_REQUIRED_PARAMS_MISSED}), Md}; @@ -73,10 +82,18 @@ authenticate(Req = #{conn := Conn, | {error, grpc_cowboy_h:error_response()}. start_timer(Req = #{conn := Conn, type := Type, interval := Interval}, Md) when Type =:= 'KEEPALIVE' andalso Interval > 0 -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), + {ok, response(call(Conn, {start_timer, keepalive, Interval})), Md}; start_timer(Req, Md) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), + {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}. -spec publish(emqx_exproto_pb:publish_request(), grpc:metadata()) @@ -84,11 +101,18 @@ start_timer(Req, Md) -> | {error, grpc_cowboy_h:error_response()}. publish(Req = #{conn := Conn, topic := Topic, qos := Qos, payload := Payload}, Md) when ?IS_QOS(Qos) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), + {ok, response(call(Conn, {publish, Topic, Qos, Payload})), Md}; publish(Req, Md) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}. -spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata()) @@ -96,18 +120,27 @@ publish(Req, Md) -> | {error, grpc_cowboy_h:error_response()}. subscribe(Req = #{conn := Conn, topic := Topic, qos := Qos}, Md) when ?IS_QOS(Qos) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), {ok, response(call(Conn, {subscribe_from_client, Topic, Qos})), Md}; subscribe(Req, Md) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}. -spec unsubscribe(emqx_exproto_pb:unsubscribe_request(), grpc:metadata()) -> {ok, emqx_exproto_pb:code_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) -> - ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + ?SLOG(debug, #{ msg => "recv_grpc_function_call" + , function => ?FUNCTION_NAME + , request => Req + }), {ok, response(call(Conn, {unsubscribe_from_client, Topic})), Md}. %%-------------------------------------------------------------------- @@ -130,9 +163,12 @@ call(ConnStr, Req) -> exit : timeout -> {error, ?RESP_UNKNOWN, <<"Connection is not answered">>}; Class : Reason : Stk-> - ?LOG(error, "Call ~p crashed: {~0p, ~0p}, " - "stacktrace: ~0p", - [Class, Reason, Stk]), + ?SLOG(error, #{ msg => "call_conn_process_crashed" + , request => Req + , conn_str=> ConnStr + , reason => {Class, Reason} + , stacktrace => Stk + }), {error, ?RESP_UNKNOWN, <<"Unkwown crashs">>} end. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index f78f9a908..6d100b943 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -172,14 +172,18 @@ handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Chann {reply, {ok, Result}, Channel}; handle_call(Req, _From, Channel) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{ msg => "unexpected_call" + , call => Req + }), {reply, ignored, Channel}. %%-------------------------------------------------------------------- %% Handle Cast %%-------------------------------------------------------------------- handle_cast(Req, Channel) -> - ?LOG(error, "Unexpected cast: ~p", [Req]), + ?SLOG(error, #{ msg => "unexpected_cast" + , cast => Req + }), {ok, Channel}. %%-------------------------------------------------------------------- @@ -190,7 +194,9 @@ handle_info({subscribe, _AutoSubs}, Channel) -> {ok, Channel}; handle_info(Info, Channel) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{ msg => "unexpected_info" + , info => Info + }), {ok, Channel}. %%-------------------------------------------------------------------- @@ -283,7 +289,9 @@ check_lwm2m_version(#coap_message{options = Opts}, }, {ok, Channel#channel{conninfo = NConnInfo}}; true -> - ?LOG(error, "Reject REGISTER due to unsupported version: ~0p", [Ver]), + ?SLOG(error, #{ msg => "reject_REGISTRE_request" + , reason => {unsupported_version, Ver} + }), {error, "invalid lwm2m version", Channel} end. @@ -313,7 +321,9 @@ enrich_clientinfo(#coap_message{options = Options} = Msg, {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo), {ok, Channel#channel{clientinfo = NClientInfo}}; _ -> - ?LOG(error, "Reject REGISTER due to wrong parameters, Query=~p", [Query]), + ?SLOG(error, #{ msg => "reject_REGISTER_request" + , reason => {wrong_paramters, Query} + }), {error, "invalid queries", Channel} end. @@ -329,8 +339,11 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx, {ok, Channel#channel{clientinfo = NClientInfo, with_context = with_context(Ctx, ClientInfo)}}; {error, Reason} -> - ?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p", - [ClientId, Username, Reason]), + ?SLOG(warning, #{ msg => "client_login_failed" + , clientid => ClientId + , username => Username + , reason => Reason + }), {error, Reason} end. @@ -374,7 +387,9 @@ process_connect(Channel = #channel{ctx = Ctx, NewResult1 = NewResult0#{events => [{event, connected}]}, iter(Iter, maps:merge(Result, NewResult1), Channel); {error, Reason} -> - ?LOG(error, "Failed to open session du to ~p", [Reason]), + ?SLOG(error, #{ msg => "falied_to_open_session" + , reason => Reason + }), iter(Iter, reply({error, bad_request}, Msg, Result), Channel) end. @@ -398,17 +413,24 @@ with_context(publish, [Topic, Msg], Ctx, ClientInfo) -> allow -> emqx:publish(Msg); _ -> - ?LOG(error, "topic:~p not allow to publish ", [Topic]) + ?SLOG(error, #{ msg => "publish_denied" + , topic => Topic + }) end; with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) -> case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of allow -> run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]), - ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, Username]), + ?SLOG(debug, #{ msg => "subscribe_topic_succeed" + , topic => Topic + , endpoint_name => Username + }), emqx:subscribe(Topic, Username, Opts); _ -> - ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic]) + ?SLOG(error, #{ msg => "subscribe_denied" + , topic => Topic + }) end; with_context(metrics, Name, Ctx, _ClientInfo) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl index e17a83195..1ee27228b 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl @@ -168,7 +168,10 @@ read_resp_to_mqtt({ok, SuccessCode}, CoapPayload, Format, Ref) -> catch error:not_implemented -> make_response(not_implemented, Ref); _:Ex:_ST -> - ?LOG(error, "~0p, bad payload format: ~0p", [Ex, CoapPayload]), + ?SLOG(error, #{ msg => "bad_payload_format" + , payload => CoapPayload + , reason => Ex + , stacktrace => _ST}), make_response(bad_request, Ref) end. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl index 6d155f9bd..70844e6d6 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl @@ -25,8 +25,6 @@ -include("src/lwm2m/include/emqx_lwm2m.hrl"). --define(LOG(Level, Format, Args), logger:Level("LWM2M-JSON: " ++ Format, Args)). - tlv_to_json(BaseName, TlvData) -> DecodedTlv = emqx_lwm2m_tlv:parse(TlvData), ObjectId = object_id(BaseName), diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index 40dc34ba6..dbb6b7566 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -405,7 +405,7 @@ send_auto_observe(RegInfo, Session) -> ObjectList = maps:get(<<"objectList">>, RegInfo, []), observe_object_list(AlternatePath, ObjectList, Session); _ -> - ?LOG(info, "Auto Observe Disabled", []), + ?SLOG(info, #{ msg => "skip_auto_observe_due_to_disabled"}), Session end. @@ -435,7 +435,10 @@ observe_object(AlternatePath, ObjectPath, Session) -> deliver_auto_observe_to_coap(AlternatePath, Payload, Session). deliver_auto_observe_to_coap(AlternatePath, TermData, Session) -> - ?LOG(info, "Auto Observe, SEND To CoAP, AlternatePath=~0p, Data=~0p ", [AlternatePath, TermData]), + ?SLOG(info, #{ msg => "send_auto_observe" + , path => AlternatePath + , data => TermData + }), {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData), maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session). @@ -568,11 +571,15 @@ send_to_coap(#session{queue = Queue} = Session) -> end. send_to_coap(Ctx, Req, Session) -> - ?LOG(debug, "Deliver To CoAP, CoapRequest: ~0p", [Req]), + ?SLOG(debug, #{ msg => "deliver_to_coap" + , coap_request => Req + }), out_to_coap(Ctx, Req, Session#session{wait_ack = Ctx}). send_msg_not_waiting_ack(Ctx, Req, Session) -> - ?LOG(debug, "Deliver To CoAP not waiting ack, CoapRequest: ~0p", [Req]), + ?SLOG(debug, #{ msg => "deliver_to_coap_and_no_ack" + , coap_request => Req + }), %% cmd_sent(Ref, LwM2MOpts). out_to_coap(Ctx, Req, Session). @@ -636,8 +643,11 @@ deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, WithContext, Session) deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session) catch ExClass:Error:ST -> - ?LOG(error, "deliver_to_coap - Invalid JSON: ~0p, Exception: ~0p, stacktrace: ~0p", - [JsonData, {ExClass, Error}, ST]), + ?SLOG(error, #{ msg => "invaild_json_format_to_deliver" + , data => JsonData + , reason => {ExClass, Error} + , stacktrace => ST + }), WithContext(metrics, 'delivery.dropped'), Session end; diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl index dd1ecddda..94ea31bf8 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl @@ -27,8 +27,6 @@ -include("src/lwm2m/include/emqx_lwm2m.hrl"). --define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)). - -define(TLV_TYPE_OBJECT_INSTANCE, 0). -define(TLV_TYPE_RESOURCE_INSTANCE, 1). -define(TLV_TYPE_MULTIPLE_RESOURCE, 2). @@ -39,8 +37,6 @@ -define(TLV_LEGNTH_16_BIT, 2). -define(TLV_LEGNTH_24_BIT, 3). - - %---------------------------------------------------------------------------------------------------------------------------------------- % [#{tlv_object_instance := Id11, value := Value11}, #{tlv_object_instance := Id12, value := Value12}, ...] % where Value11 and Value12 is a list: diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl index a4ec27413..d744a23f9 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl @@ -28,9 +28,6 @@ , get_resource_operations/2 ]). --define(LOG(Level, Format, Args), - logger:Level("LWM2M-OBJ: " ++ Format, Args)). - % This module is for future use. Disabled now. get_obj_def(ObjectIdInt, true) -> @@ -50,7 +47,6 @@ get_object_and_resource_id(ResourceNameBinary, ObjDefinition) -> ResourceNameString = binary_to_list(ResourceNameBinary), [#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition), [#xmlAttribute{value=ResourceId}] = xmerl_xpath:string("Resources/Item/Name[.=\""++ResourceNameString++"\"]/../@ID", ObjDefinition), - ?LOG(debug, "get_object_and_resource_id ObjectId=~p, ResourceId=~p", [ObjectId, ResourceId]), {ObjectId, ResourceId}. get_resource_type(ResourceIdInt, ObjDefinition) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl index ec7c83de1..3cef3c19e 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl @@ -18,6 +18,7 @@ -include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl"). -include_lib("xmerl/include/xmerl.hrl"). +-include_lib("emqx/include/logger.hrl"). % This module is for future use. Disabled now. @@ -37,9 +38,6 @@ , code_change/3 ]). --define(LOG(Level, Format, Args), - logger:Level("LWM2M-OBJ-DB: " ++ Format, Args)). - -define(LWM2M_OBJECT_DEF_TAB, lwm2m_object_def_tab). -define(LWM2M_OBJECT_NAME_TO_ID_TAB, lwm2m_object_name_to_id_tab). @@ -130,7 +128,11 @@ load_loop([FileName|T]) -> [#xmlText{value=Name}] = xmerl_xpath:string("Name/text()", ObjectXml), ObjectId = list_to_integer(ObjectIdString), NameBinary = list_to_binary(Name), - ?LOG(debug, "load_loop FileName=~p, ObjectId=~p, Name=~p", [FileName, ObjectId, NameBinary]), + ?SLOG(debug, #{ msg => "load_object_succeed" + , filename => FileName + , object_id => ObjectId + , object_name => NameBinary + }), ets:insert(?LWM2M_OBJECT_DEF_TAB, {ObjectId, ObjectXml}), ets:insert(?LWM2M_OBJECT_NAME_TO_ID_TAB, {NameBinary, ObjectId}), load_loop(T). diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl index f1fb4eeb1..dc967681a 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl @@ -57,18 +57,24 @@ init([GwId, Port]) -> sock = Sock, port = Port, duration = Duration})}. handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected request: ~p", [Req]), + ?SLOG(error, #{ msg => "unexpected_call" + , call => Req + }), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected msg: ~p", [Msg]), + ?SLOG(error, #{ msg => "unexpected_cast" + , cast => Msg + }), {noreply, State}. handle_info(broadcast_advertise, State) -> {noreply, ensure_advertise(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{ msg => "unexpected_info" + , info => Info + }), {noreply, State}. terminate(_Reason, #state{tref = Timer}) -> @@ -90,7 +96,9 @@ send_advertise(#state{gwid = GwId, sock = Sock, port = Port, addrs = Addrs, duration = Duration}) -> Data = emqx_sn_frame:serialize_pkt(?SN_ADVERTISE_MSG(GwId, Duration), #{}), lists:foreach(fun(Addr) -> - ?LOG(debug, "SEND SN_ADVERTISE to ~p~n", [Addr]), + ?SLOG(debug, #{ msg => "send_ADVERTISE_msg" + , address => Addr + }), gen_udp:send(Sock, Addr, Port, Data) end, Addrs). diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index c745f72f6..cc3e8e053 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -287,8 +287,11 @@ auth_connect(_Packet, Channel = #channel{ctx = Ctx, {ok, NClientInfo} -> {ok, Channel#channel{clientinfo = NClientInfo}}; {error, Reason} -> - ?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p", - [ClientId, Username, Reason]), + ?SLOG(warning, #{ msg => "client_login_failed" + , clientid => ClientId + , username => Username + , reason => Reason + }), %% FIXME: ReasonCode? {error, Reason} end. @@ -321,7 +324,9 @@ process_connect(Channel = #channel{ handle_out(connack, ?SN_RC_ACCEPTED, Channel#channel{session = Session}); {error, Reason} -> - ?LOG(error, "Failed to open session due to ~p", [Reason]), + ?SLOG(error, #{ msg => "failed_to_open_session" + , reason => Reason + }), handle_out(connack, ?SN_RC_FAILED_SESSION, Channel) end. @@ -383,19 +388,24 @@ handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, false -> ok end, - ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", - [?NEG_QOS_CLIENT_ID]), + ?SLOG(debug, #{ msg => "receive_qo3_message_in_idle_mode" + , topic => TopicName + , data => Data + }), {ok, Channel}; handle_in(Pkt = #mqtt_sn_message{type = Type}, Channel = #channel{conn_state = idle}) when Type /= ?SN_CONNECT -> - ?LOG(warning, "Receive unknown packet ~0p in idle state", [Pkt]), + ?SLOG(warning, #{ msg => "receive_unknown_packet_in_idle_state" + , packet => Pkt + }), shutdown(normal, Channel); handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId), Channel = #channel{conn_state = connecting}) -> - ?LOG(warning, "Receive connect packet in connecting state"), + ?SLOG(warning, #{ msg => "receive_connect_packet_in_connecting_state" + }), {ok, Channel}; handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId), @@ -461,12 +471,17 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName), clientinfo = #{clientid := ClientId}}) -> case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of TopicId when is_integer(TopicId) -> - ?LOG(debug, "register TopicName=~p, TopicId=~p", - [TopicName, TopicId]), + ?SLOG(debug, #{ msg => "registered_topic_name" + , topic_name => TopicName + , topic_id => TopicId + }), AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), {ok, {outgoing, AckPacket}, Channel}; {error, too_large} -> - ?LOG(error, "TopicId is full! TopicName=~p", [TopicName]), + ?SLOG(error, #{ msg => "register_topic_failed" + , topic_name => TopicName + , reason => topic_id_fulled + }), AckPacket = ?SN_REGACK_MSG( ?SN_INVALID_TOPIC_ID, MsgId, @@ -474,8 +489,10 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName), ), {ok, {outgoing, AckPacket}, Channel}; {error, wildcard_topic} -> - ?LOG(error, "wildcard topic can not be registered! TopicName=~p", - [TopicName]), + ?SLOG(error, #{ msg => "register_topic_failed" + , topic_name => TopicName + , reason => not_support_wildcard_topic + }), AckPacket = ?SN_REGACK_MSG( ?SN_INVALID_TOPIC_ID, MsgId, @@ -520,13 +537,17 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> - ?LOG(warning, "The PUBACK MsgId ~w is inuse.", - [MsgId]), + ?SLOG(warning, #{ msg => "commit_puback_failed" + , msg_id => MsgId + , reason => msg_id_inused + }), ok = metrics_inc(Ctx, 'packets.puback.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBACK MsgId ~w is not found.", - [MsgId]), + ?SLOG(warning, #{ msg => "commit_puback_failed" + , msg_id => MsgId + , reason => not_found + }), ok = metrics_inc(Ctx, 'packets.puback.missed'), {ok, Channel} end; @@ -543,7 +564,9 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), {ok, {outgoing, RegPkt}, Channel} end; _ -> - ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), + ?SLOG(error, #{ msg => "cannt_handle_PUBACK" + , return_code => ReturnCode + }), {ok, Channel} end; @@ -557,11 +580,17 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId), NChannel = Channel#channel{session = NSession}, handle_out(pubrel, MsgId, NChannel); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> - ?LOG(warning, "The PUBREC MsgId ~w is inuse.", [MsgId]), + ?SLOG(warning, #{ msg => "commit_PUBREC_failed" + , msg_id => MsgId + , reason => msg_id_inused + }), ok = metrics_inc(Ctx, 'packets.pubrec.inuse'), handle_out(pubrel, MsgId, Channel); {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBREC ~w is not found.", [MsgId]), + ?SLOG(warning, #{ msg => "commit_PUBREC_failed" + , msg_id => MsgId + , reason => not_found + }), ok = metrics_inc(Ctx, 'packets.pubrec.missed'), handle_out(pubrel, MsgId, Channel) end; @@ -573,7 +602,10 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, MsgId, NChannel); {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBREL MsgId ~w is not found.", [MsgId]), + ?SLOG(warning, #{ msg => "commit_PUBREL_failed" + , msg_id => MsgId + , reason => not_found + }), ok = metrics_inc(Ctx, 'packets.pubrel.missed'), handle_out(pubcomp, MsgId, Channel) end; @@ -587,10 +619,17 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> + ?SLOG(warning, #{ msg => "commit_PUBCOMP_failed" + , msg_id => MsgId + , reason => msg_id_inused + }), ok = metrics_inc(Ctx, 'packets.pubcomp.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBCOMP MsgId ~w is not found", [MsgId]), + ?SLOG(warning, #{ msg => "commit_PUBCOMP_failed" + , msg_id => MsgId + , reason => not_found + }), ok = metrics_inc(Ctx, 'packets.pubcomp.missed'), {ok, Channel} end; @@ -626,8 +665,10 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName), UnsubAck = ?SN_UNSUBACK_MSG(MsgId), {ok, outgoing_and_update(UnsubAck), NChannel}; {error, Reason, NChannel} -> - ?LOG(warning, "Unsubscribe ~p failed: ~0p", - [TopicIdOrName, Reason]), + ?SLOG(warning, #{ msg => "unsubscribe_failed" + , topic => TopicIdOrName + , reason => Reason + }), %% XXX: Even if it fails, the reply is successful. UnsubAck = ?SN_UNSUBACK_MSG(MsgId), {ok, {outgoing, UnsubAck}, NChannel} @@ -674,7 +715,9 @@ handle_in(?SN_WILLMSGUPD_MSG(Payload), handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> - ?LOG(error, "Unexpected frame error: ~p", [Reason]), + ?SLOG(error, #{ msg => "unexpected_frame_error" + , reason => Reason + }), shutdown(Reason, Channel). after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) -> @@ -689,13 +732,15 @@ outgoing_and_update(Pkt) -> %%-------------------------------------------------------------------- %% Handle Publish -check_qos3_enable(?SN_PUBLISH_MSG(Flags, _, _, _), +check_qos3_enable(?SN_PUBLISH_MSG(Flags, TopicId, _MsgId, Data), #channel{enable_qos3 = EnableQoS3}) -> #mqtt_sn_flags{qos = QoS} = Flags, case EnableQoS3 =:= false andalso QoS =:= ?QOS_NEG1 of true -> - ?LOG(debug, "The enable_qos3 is false, ignore the received " - "publish with QoS=-1 in connected mode!"), + ?SLOG(debug, #{ msg => "ignore_msg_due_to_qos3_disabled" + , topic_id => TopicId + , data => Data + }), {error, ?SN_RC_NOT_SUPPORTED}; false -> ok @@ -781,8 +826,9 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2}, handle_out(puback , {TopicId, MsgId, ?SN_RC_NOT_SUPPORTED}, Channel); {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?LOG(warning, "Dropped the qos2 packet ~w " - "due to awaiting_rel is full.", [MsgId]), + ?SLOG(warning, #{ msg => "dropped_the_qos2_packet_due_to_awaiting_rel_full" + , msg_id => MsgId + }), ok = metrics_inc(Ctx, 'packets.publish.dropped'), handle_out(puback, {TopicId, MsgId, ?SN_RC_CONGESTION}, Channel) end. @@ -860,8 +906,10 @@ run_client_subs_hook({TopicId, TopicName, QoS}, case run_hooks(Ctx, 'client.subscribe', [ClientInfo, #{}], TopicFilters) of [] -> - ?LOG(warning, "Skip to subscribe ~ts, " - "due to 'client.subscribe' denied!", [TopicName]), + ?SLOG(warning, #{ msg => "skip_to_subscribe" + , topic_name => TopicName + , reason => "'client.subscribe' filtered it" + }), {error, ?SN_EXCEED_LIMITATION}; [{NTopicName, NSubOpts}|_] -> {ok, {TopicId, NTopicName, NSubOpts}, Channel} @@ -879,8 +927,10 @@ do_subscribe({TopicId, TopicName, SubOpts}, {ok, {TopicId, NTopicName, NSubOpts}, Channel#channel{session = NSession}}; {error, ?RC_QUOTA_EXCEEDED} -> - ?LOG(warning, "Cannot subscribe ~ts due to ~ts.", - [TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]), + ?SLOG(warning, #{ msg => "cannt_subscribe_due_to_quota_exceeded" + , topic_name => TopicName + , reason => emqx_reason_codes:text(?RC_QUOTA_EXCEEDED) + }), {error, ?SN_EXCEED_LIMITATION} end. @@ -1185,7 +1235,9 @@ handle_call(discard, _From, Channel) -> % reply(ok, Channel#channel{quota = Quota}); handle_call(Req, _From, Channel) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{ msg => "unexpected_call" + , call => Req + }), reply(ignored, Channel). %%-------------------------------------------------------------------- @@ -1225,7 +1277,9 @@ handle_info({sock_closed, Reason}, handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?LOG(error, "Unexpected sock_closed: ~p", [Reason]), + ?SLOG(error, #{ msg => "unexpected_sock_closed" + , reason => Reason + }), {ok, Channel}; handle_info(clean_authz_cache, Channel) -> @@ -1233,7 +1287,9 @@ handle_info(clean_authz_cache, Channel) -> {ok, Channel}; handle_info(Info, Channel) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{ msg => "unexpected_info" + , info => Info + }), {ok, Channel}. %%-------------------------------------------------------------------- @@ -1389,7 +1445,9 @@ handle_timeout(_TRef, expire_asleep, Channel) -> shutdown(asleep_timeout, Channel); handle_timeout(_TRef, Msg, Channel) -> - ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), + ?SLOG(error, #{ msg => "unexpected_timeout" + , timeout_msg => Msg + }), {ok, Channel}. %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 2534eee26..c1f76e95a 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -22,9 +22,7 @@ -behaviour(gen_server). -include("src/mqttsn/include/emqx_sn.hrl"). - --define(LOG(Level, Format, Args), - emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). +-include_lib("emqx/include/logger.hrl"). -export([ start_link/2 ]). @@ -215,15 +213,21 @@ handle_call(name, _From, State = #state{tabname = Tab}) -> {reply, {Tab, self()}, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected request: ~p", [Req]), + ?SLOG(error, #{ msg => "unexpected_call" + , call => Req + }), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected msg: ~p", [Msg]), + ?SLOG(error, #{ msg => "unexpected_cast" + , cast => Msg + }), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{ msg => "unexpected_info" + , info => Info + }), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 1165898e9..3f77aa8eb 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -280,8 +280,11 @@ auth_connect(_Packet, Channel = #channel{ctx = Ctx, {ok, NClientInfo} -> {ok, Channel#channel{clientinfo = NClientInfo}}; {error, Reason} -> - ?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p", - [ClientId, Username, Reason]), + ?SLOG(warning, #{ msg => "client_login_failed" + , clientid => ClientId + , username => Username + , reason => Reason + }), {error, Reason} end. @@ -315,7 +318,9 @@ process_connect(Channel = #channel{ {<<"heart-beat">>, reverse_heartbeats(Heartbeat)}], handle_out(connected, Headers, Channel#channel{session = Session}); {error, Reason} -> - ?LOG(error, "Failed to open session du to ~p", [Reason]), + ?SLOG(error, #{ msg => "failed_to_open_session" + , reason => Reason + }), Headers = [{<<"version">>, <<"1.0,1.1,1.2">>}, {<<"content-type">>, <<"text/plain">>}], handle_out(connerr, {Headers, undefined, <<"Not Authenticated">>}, Channel) @@ -403,8 +408,10 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers), handle_out(receipt, receipt_id(Headers), NChannel1) end; {error, ErrMsg, NChannel} -> - ?LOG(error, "Failed to subscribe topic ~ts, reason: ~ts", - [Topic, ErrMsg]), + ?SLOG(error, #{ msg => "failed_top_subscribe_topic" + , topic => Topic + , reason => ErrMsg + }), handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel) end; @@ -507,7 +514,9 @@ handle_in(?PACKET(?CMD_DISCONNECT, Headers), Channel) -> shutdown_with_recepit(normal, receipt_id(Headers), Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> - ?LOG(error, "Unexpected frame error: ~p", [Reason]), + ?SLOG(error, #{ msg => "unexpected_frame_error" + , reason => Reason + }), shutdown(Reason, Channel). with_transaction(Headers, Channel = #channel{transaction = Trans}, Fun) -> @@ -653,8 +662,10 @@ handle_call({subscribe, Topic, SubOpts}, _From, NChannel1 = NChannel#channel{subscriptions = NSubs}, reply(ok, NChannel1); {error, ErrMsg, NChannel} -> - ?LOG(error, "Failed to subscribe topic ~ts, reason: ~ts", - [Topic, ErrMsg]), + ?SLOG(error, #{ msg => "failed_to_subscribe_topic" + , topic => Topic + , reason => ErrMsg + }), reply({error, ErrMsg}, NChannel) end end; @@ -715,7 +726,9 @@ handle_call(list_authz_cache, _From, Channel) -> % reply(ok, Channel#channel{quota = Quota}); handle_call(Req, _From, Channel) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{ msg => "unexpected_call" + , call => Req + }), reply(ignored, Channel). %%-------------------------------------------------------------------- @@ -755,7 +768,9 @@ handle_info({sock_closed, Reason}, handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?LOG(error, "Unexpected sock_closed: ~p", [Reason]), + ?SLOG(error, #{ msg => "unexpected_sock_closed" + , reason => Reason + }), {ok, Channel}; handle_info(clean_authz_cache, Channel) -> @@ -763,7 +778,9 @@ handle_info(clean_authz_cache, Channel) -> {ok, Channel}; handle_info(Info, Channel) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{ msg => "unexpected_info" + , info => Info + }), {ok, Channel}. %%-------------------------------------------------------------------- @@ -828,9 +845,10 @@ handle_deliver(Delivers, }, [Frame|Acc]; false -> - ?LOG(error, "Dropped message ~0p due to not found " - "subscription id for ~ts", - [Message, emqx_message:topic(Message)]), + ?SLOG(error, #{ msg => "dropped_message_due_to_subscription_not_found" + , message => Message + , topic => emqx_message:topic(Message) + }), metrics_inc('delivery.dropped', Channel), metrics_inc('delivery.dropped.no_subid', Channel), Acc