chore(gw): use SLOG to replace LOG
This commit is contained in:
parent
2416fa4e13
commit
69df027ee9
|
@ -476,7 +476,7 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||||
handle_info({sock_error, Reason}, State);
|
handle_info({sock_error, Reason}, State);
|
||||||
|
|
||||||
handle_msg({close, Reason}, State) ->
|
handle_msg({close, Reason}, State) ->
|
||||||
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
handle_msg({event, connected}, State = #state{
|
handle_msg({event, connected}, State = #state{
|
||||||
|
@ -525,7 +525,7 @@ handle_msg(Msg, State) ->
|
||||||
terminate(Reason, State = #state{
|
terminate(Reason, State = #state{
|
||||||
chann_mod = ChannMod,
|
chann_mod = ChannMod,
|
||||||
channel = Channel}) ->
|
channel = Channel}) ->
|
||||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}),
|
||||||
_ = ChannMod:terminate(Reason, Channel),
|
_ = ChannMod:terminate(Reason, Channel),
|
||||||
_ = close_socket(State),
|
_ = close_socket(State),
|
||||||
exit(Reason).
|
exit(Reason).
|
||||||
|
@ -620,7 +620,7 @@ handle_timeout(TRef, Msg, State) ->
|
||||||
parse_incoming(Data, State = #state{
|
parse_incoming(Data, State = #state{
|
||||||
chann_mod = ChannMod,
|
chann_mod = ChannMod,
|
||||||
channel = Channel}) ->
|
channel = Channel}) ->
|
||||||
?LOG(debug, "RECV ~0p", [Data]),
|
?SLOG(debug, #{msg => "RECV_data", data => Data}),
|
||||||
Oct = iolist_size(Data),
|
Oct = iolist_size(Data),
|
||||||
inc_counter(incoming_bytes, Oct),
|
inc_counter(incoming_bytes, Oct),
|
||||||
Ctx = ChannMod:info(ctx, Channel),
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
@ -643,8 +643,12 @@ parse_incoming(Data, Packets,
|
||||||
parse_incoming(Rest, [Packet|Packets], NState)
|
parse_incoming(Rest, [Packet|Packets], NState)
|
||||||
catch
|
catch
|
||||||
error:Reason:Stk ->
|
error:Reason:Stk ->
|
||||||
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
|
?SLOG(error, #{ msg => "parse_frame_failed"
|
||||||
[Reason, Stk, Data]),
|
, at_state => ParseState
|
||||||
|
, input_bytes => Data
|
||||||
|
, reason => Reason
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
{[{frame_error, Reason}|Packets], State}
|
{[{frame_error, Reason}|Packets], State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -663,7 +667,9 @@ handle_incoming(Packet, State = #state{
|
||||||
}) ->
|
}) ->
|
||||||
Ctx = ChannMod:info(ctx, Channel),
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
ok = inc_incoming_stats(Ctx, FrameMod, Packet),
|
ok = inc_incoming_stats(Ctx, FrameMod, Packet),
|
||||||
?LOG(debug, "RECV ~ts", [FrameMod:format(Packet)]),
|
?SLOG(debug, #{ msg => "RECV_packet"
|
||||||
|
, packet => FrameMod:format(Packet)
|
||||||
|
}),
|
||||||
with_channel(handle_in, [Packet], State).
|
with_channel(handle_in, [Packet], State).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -715,12 +721,17 @@ serialize_and_inc_stats_fun(#state{
|
||||||
Ctx = ChannMod:info(ctx, Channel),
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
fun(Packet) ->
|
fun(Packet) ->
|
||||||
case FrameMod:serialize_pkt(Packet, Serialize) of
|
case FrameMod:serialize_pkt(Packet, Serialize) of
|
||||||
<<>> -> ?LOG(warning, "~ts is discarded due to the frame is too large!",
|
<<>> ->
|
||||||
[FrameMod:format(Packet)]),
|
?SLOG(warning, #{ msg => "packet_too_large_discarded"
|
||||||
|
, packet => FrameMod:format(Packet)
|
||||||
|
}),
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
|
||||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
||||||
<<>>;
|
<<>>;
|
||||||
Data -> ?LOG(debug, "SEND ~ts", [FrameMod:format(Packet)]),
|
Data ->
|
||||||
|
?SLOG(debug, #{ msg => "SEND_packet"
|
||||||
|
, packet => FrameMod:format(Packet)
|
||||||
|
}),
|
||||||
ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
|
ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
|
||||||
Data
|
Data
|
||||||
end
|
end
|
||||||
|
@ -760,7 +771,9 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({sock_error, Reason}, State) ->
|
handle_info({sock_error, Reason}, State) ->
|
||||||
?LOG(debug, "Socket error: ~p", [Reason]),
|
?SLOG(debug, #{ msg => "sock_error"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -775,7 +788,10 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
||||||
{ok, Limiter1} ->
|
{ok, Limiter1} ->
|
||||||
State#state{limiter = Limiter1};
|
State#state{limiter = Limiter1};
|
||||||
{pause, Time, Limiter1} ->
|
{pause, Time, Limiter1} ->
|
||||||
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
%% XXX: which limiter reached?
|
||||||
|
?SLOG(warning, #{ msg => "reach_rate_limit"
|
||||||
|
, pause => Time
|
||||||
|
}),
|
||||||
TRef = emqx_misc:start_timer(Time, limit_timeout),
|
TRef = emqx_misc:start_timer(Time, limit_timeout),
|
||||||
State#state{sockstate = blocked,
|
State#state{sockstate = blocked,
|
||||||
limiter = Limiter1,
|
limiter = Limiter1,
|
||||||
|
|
|
@ -189,14 +189,14 @@ handle_call({send_request, Msg}, From, Channel) ->
|
||||||
erlang:setelement(1, Result, noreply);
|
erlang:setelement(1, Result, noreply);
|
||||||
|
|
||||||
handle_call(Req, _From, Channel) ->
|
handle_call(Req, _From, Channel) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, Channel}.
|
{reply, ignored, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Cast
|
%% Handle Cast
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
handle_cast(Req, Channel) ->
|
handle_cast(Req, Channel) ->
|
||||||
?LOG(error, "Unexpected cast: ~p", [Req]),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -206,7 +206,7 @@ handle_info({subscribe, _}, Channel) ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -331,8 +331,11 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
|
||||||
{ok, NClientInfo} ->
|
{ok, NClientInfo} ->
|
||||||
{ok, Channel#channel{clientinfo = NClientInfo}};
|
{ok, Channel#channel{clientinfo = NClientInfo}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
|
?SLOG(warning, #{ msg => "client_login_failed"
|
||||||
[ClientId, Username, Reason]),
|
, username => Username
|
||||||
|
, clientid => ClientId
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -375,7 +378,10 @@ process_connect(#channel{ctx = Ctx,
|
||||||
reply({ok, created}, Token, Msg, Result),
|
reply({ok, created}, Token, Msg, Result),
|
||||||
Channel#channel{token = Token});
|
Channel#channel{token = Token});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to open session du to ~p", [Reason]),
|
?SLOG(error, #{ msg => "failed_open_session"
|
||||||
|
, clientid => maps:get(clientid, ClientInfo)
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
|
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -110,8 +110,9 @@ clients_insta(get, #{ bindings := #{name := Name0,
|
||||||
[ClientInfo] ->
|
[ClientInfo] ->
|
||||||
{200, ClientInfo};
|
{200, ClientInfo};
|
||||||
[ClientInfo | _More] ->
|
[ClientInfo | _More] ->
|
||||||
?LOG(warning, "More than one client info was returned on ~ts",
|
?SLOG(warning, #{ msg => "more_than_one_channel_found"
|
||||||
[ClientId]),
|
, clientid => ClientId
|
||||||
|
}),
|
||||||
{200, ClientInfo};
|
{200, ClientInfo};
|
||||||
[] ->
|
[] ->
|
||||||
return_http_error(404, "Client not found")
|
return_http_error(404, "Client not found")
|
||||||
|
|
|
@ -40,7 +40,6 @@ stop(_State) ->
|
||||||
|
|
||||||
load_default_gateway_applications() ->
|
load_default_gateway_applications() ->
|
||||||
Apps = gateway_type_searching(),
|
Apps = gateway_type_searching(),
|
||||||
?LOG(info, "Starting the default gateway types: ~p", [Apps]),
|
|
||||||
lists:foreach(fun reg/1, Apps).
|
lists:foreach(fun reg/1, Apps).
|
||||||
|
|
||||||
gateway_type_searching() ->
|
gateway_type_searching() ->
|
||||||
|
@ -51,12 +50,16 @@ gateway_type_searching() ->
|
||||||
reg(Mod) ->
|
reg(Mod) ->
|
||||||
try
|
try
|
||||||
Mod:reg(),
|
Mod:reg(),
|
||||||
?LOG(info, "Register ~ts gateway application successfully!", [Mod])
|
?SLOG(debug, #{ msg => "register_gateway_succeed"
|
||||||
|
, callback_module => Mod
|
||||||
|
})
|
||||||
catch
|
catch
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
?LOG(error, "Failed to register ~ts gateway application: {~p, ~p}\n"
|
?SLOG(error, #{ msg => "failed_to_register_gateway"
|
||||||
"Stacktrace: ~0p",
|
, callback_module => Mod
|
||||||
[Mod, Class, Reason, Stk])
|
, reason => {Class, Reason}
|
||||||
|
, stacktrace => Stk
|
||||||
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
load_gateway_by_default() ->
|
load_gateway_by_default() ->
|
||||||
|
@ -67,14 +70,19 @@ load_gateway_by_default([]) ->
|
||||||
load_gateway_by_default([{Type, Confs}|More]) ->
|
load_gateway_by_default([{Type, Confs}|More]) ->
|
||||||
case emqx_gateway_registry:lookup(Type) of
|
case emqx_gateway_registry:lookup(Type) of
|
||||||
undefined ->
|
undefined ->
|
||||||
?LOG(error, "Skip to load ~ts gateway, because it is not registered",
|
?SLOG(error, #{ msg => "skip_to_load_gateway"
|
||||||
[Type]);
|
, gateway_name => Type
|
||||||
|
});
|
||||||
_ ->
|
_ ->
|
||||||
case emqx_gateway:load(Type, Confs) of
|
case emqx_gateway:load(Type, Confs) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
?LOG(debug, "Load ~ts gateway successfully!", [Type]);
|
?SLOG(debug, #{ msg => "load_gateway_succeed"
|
||||||
|
, gateway_name => Type
|
||||||
|
});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to load ~ts gateway: ~0p", [Type, Reason])
|
?SLOG(error, #{ msg => "load_gateway_failed"
|
||||||
|
, gateway_name => Type
|
||||||
|
, reason => Reason})
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
load_gateway_by_default(More).
|
load_gateway_by_default(More).
|
||||||
|
|
|
@ -282,8 +282,12 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
|
||||||
Session
|
Session
|
||||||
catch
|
catch
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
?LOG(error, "Failed to create a session: ~p, ~p "
|
?SLOG(error, #{ msg => "failed_create_session"
|
||||||
"Stacktrace:~0p", [Class, Reason, Stk]),
|
, clientid => maps:get(clientid, ClientInfo, undefined)
|
||||||
|
, username => maps:get(username, ClientInfo, undefined)
|
||||||
|
, reason => {Class, Reason}
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
throw(Reason)
|
throw(Reason)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -337,7 +341,9 @@ kick_session(GwName, ClientId) ->
|
||||||
kick_session(GwName, ClientId, ChanPid);
|
kick_session(GwName, ClientId, ChanPid);
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
||||||
?LOG(error, "More than one channel found: ~p", [ChanPids]),
|
?SLOG(error, #{ msg => "more_than_one_channel_found"
|
||||||
|
, chan_pids => ChanPids
|
||||||
|
}),
|
||||||
lists:foreach(fun(StalePid) ->
|
lists:foreach(fun(StalePid) ->
|
||||||
catch discard_session(GwName, ClientId, StalePid)
|
catch discard_session(GwName, ClientId, StalePid)
|
||||||
end, StalePids),
|
end, StalePids),
|
||||||
|
|
|
@ -336,8 +336,10 @@ with_gateway(GwName0, Fun) ->
|
||||||
error : {update_conf_error, already_exist} ->
|
error : {update_conf_error, already_exist} ->
|
||||||
return_http_error(400, "Resource already exist");
|
return_http_error(400, "Resource already exist");
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
?LOG(error, "Uncatched error: {~p, ~p}, stacktrace: ~0p",
|
?SLOG(error, #{ msg => "uncatched_error"
|
||||||
[Class, Reason, Stk]),
|
, reason => {Class, Reason}
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
return_http_error(500, {Class, Reason, Stk})
|
return_http_error(500, {Class, Reason, Stk})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -103,7 +103,9 @@ init([Gateway, Ctx, _GwDscrptr]) ->
|
||||||
},
|
},
|
||||||
case maps:get(enable, Config, true) of
|
case maps:get(enable, Config, true) of
|
||||||
false ->
|
false ->
|
||||||
?LOG(info, "Skipp to start ~ts gateway due to disabled", [GwName]),
|
?SLOG(info, #{ msg => "skip_to_start_gateway_due_to_disabled"
|
||||||
|
, gateway_name => GwName
|
||||||
|
}),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
true ->
|
true ->
|
||||||
case cb_gateway_load(State) of
|
case cb_gateway_load(State) of
|
||||||
|
@ -160,13 +162,19 @@ handle_call(_Request, _From, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) ->
|
handle_info({'EXIT', Pid, Reason}, State = #state{name = Name,
|
||||||
|
child_pids = Pids}) ->
|
||||||
case lists:member(Pid, Pids) of
|
case lists:member(Pid, Pids) of
|
||||||
true ->
|
true ->
|
||||||
?LOG(error, "Child process ~p exited: ~0p.", [Pid, Reason]),
|
?SLOG(error, #{ msg => "child_process_exited"
|
||||||
|
, child => Pid
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
case Pids -- [Pid]of
|
case Pids -- [Pid]of
|
||||||
[] ->
|
[] ->
|
||||||
?LOG(error, "All child process exited!"),
|
?SLOG(error, #{ msg => "gateway_all_children_process_existed"
|
||||||
|
, gateway_name => Name
|
||||||
|
}),
|
||||||
{noreply, State#state{status = stopped,
|
{noreply, State#state{status = stopped,
|
||||||
child_pids = [],
|
child_pids = [],
|
||||||
gw_state = undefined}};
|
gw_state = undefined}};
|
||||||
|
@ -174,12 +182,18 @@ handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) ->
|
||||||
{noreply, State#state{child_pids = RemainPids}}
|
{noreply, State#state{child_pids = RemainPids}}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
?LOG(error, "Unknown process exited ~p:~0p", [Pid, Reason]),
|
?SLOG(error, #{ msg => "gateway_catch_a_unknown_process_exited"
|
||||||
|
, child => Pid
|
||||||
|
, reason => Reason
|
||||||
|
, gateway_name => Name
|
||||||
|
}),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(warning, "Unexcepted info: ~p", [Info]),
|
?SLOG(warning, #{ msg => "unexcepted_info"
|
||||||
|
, info => Info
|
||||||
|
}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, State = #state{child_pids = Pids}) ->
|
terminate(_Reason, State = #state{child_pids = Pids}) ->
|
||||||
|
@ -266,14 +280,18 @@ do_create_authn_chain(ChainName, AuthConf) ->
|
||||||
case emqx_authentication:create_authenticator(ChainName, AuthConf) of
|
case emqx_authentication:create_authenticator(ChainName, AuthConf) of
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to create authenticator chain ~ts, "
|
?SLOG(error, #{ msg => "failed_to_create_authenticator"
|
||||||
"reason: ~p, config: ~p",
|
, chain_name => ChainName
|
||||||
[ChainName, Reason, AuthConf]),
|
, reason => Reason
|
||||||
|
, config => AuthConf
|
||||||
|
}),
|
||||||
throw({badauth, Reason})
|
throw({badauth, Reason})
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Falied to create authn chain ~ts, reason ~p",
|
?SLOG(error, #{ msg => "failed_to_create_authn_chanin"
|
||||||
[ChainName, Reason]),
|
, chain_name => ChainName
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
throw({badauth, Reason})
|
throw({badauth, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -293,8 +311,10 @@ do_deinit_authn(Names) ->
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, {not_found, _}} -> ok;
|
{error, {not_found, _}} -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to clean authentication chain: ~ts, "
|
?SLOG(error, #{ msg => "failed_to_clean_authn_chain"
|
||||||
"reason: ~p", [ChainName, Reason])
|
, chain_name => ChainName
|
||||||
|
, reason => Reason
|
||||||
|
})
|
||||||
end
|
end
|
||||||
end, Names).
|
end, Names).
|
||||||
|
|
||||||
|
@ -348,10 +368,12 @@ cb_gateway_unload(State = #state{name = GwName,
|
||||||
stopped_at = erlang:system_time(millisecond)}}
|
stopped_at = erlang:system_time(millisecond)}}
|
||||||
catch
|
catch
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
?LOG(error, "Failed to unload gateway (~0p, ~0p) crashed: "
|
?SLOG(error, #{ msg => "unload_gateway_crashed"
|
||||||
"{~p, ~p}, stacktrace: ~0p",
|
, gateway_name => GwName
|
||||||
[GwName, GwState,
|
, inner_state => GwState
|
||||||
Class, Reason, Stk]),
|
, reason => {Class, Reason}
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
{error, {Class, Reason, Stk}}
|
{error, {Class, Reason, Stk}}
|
||||||
after
|
after
|
||||||
_ = do_deinit_authn(State#state.authns)
|
_ = do_deinit_authn(State#state.authns)
|
||||||
|
@ -388,10 +410,13 @@ cb_gateway_load(State = #state{name = GwName,
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
Class : Reason1 : Stk ->
|
Class : Reason1 : Stk ->
|
||||||
?LOG(error, "Failed to load ~ts gateway (~0p, ~0p) "
|
?SLOG(error, #{ msg => "load_gateway_crashed"
|
||||||
"crashed: {~p, ~p}, stacktrace: ~0p",
|
, gateway_name => GwName
|
||||||
[GwName, Gateway, Ctx,
|
, gateway => Gateway
|
||||||
Class, Reason1, Stk]),
|
, ctx => Ctx
|
||||||
|
, reason => {Class, Reason1}
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
{error, {Class, Reason1, Stk}}
|
{error, {Class, Reason1, Stk}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -413,9 +438,12 @@ cb_gateway_update(Config,
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
Class : Reason1 : Stk ->
|
Class : Reason1 : Stk ->
|
||||||
?LOG(error, "Failed to update ~ts gateway to config: ~0p crashed: "
|
?SLOG(error, #{ msg => "update_gateway_crashed"
|
||||||
"{~p, ~p}, stacktrace: ~0p",
|
, gateway_name => GwName
|
||||||
[GwName, Config, Class, Reason1, Stk]),
|
, new_config => Config
|
||||||
|
, reason => {Class, Reason1}
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
{error, {Class, Reason1, Stk}}
|
{error, {Class, Reason1, Stk}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -263,7 +263,9 @@ handle_call(close, _From, Channel) ->
|
||||||
|
|
||||||
handle_call({auth, ClientInfo, _Password}, _From,
|
handle_call({auth, ClientInfo, _Password}, _From,
|
||||||
Channel = #channel{conn_state = connected}) ->
|
Channel = #channel{conn_state = connected}) ->
|
||||||
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
|
?SLOG(warning, #{ msg => "ingore_duplicated_authorized_command"
|
||||||
|
, request_clientinfo => ClientInfo
|
||||||
|
}),
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
|
||||||
handle_call({auth, ClientInfo0, Password}, _From,
|
handle_call({auth, ClientInfo0, Password}, _From,
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
@ -291,18 +293,25 @@ handle_call({auth, ClientInfo0, Password}, _From,
|
||||||
SessFun
|
SessFun
|
||||||
) of
|
) of
|
||||||
{ok, _Session} ->
|
{ok, _Session} ->
|
||||||
?LOG(debug, "Client ~ts (Username: '~ts') authorized successfully!",
|
?SLOG(debug, #{ msg => "client_login_succeed"
|
||||||
[ClientId, Username]),
|
, clientid => ClientId
|
||||||
|
, username => Username
|
||||||
|
}),
|
||||||
{reply, ok, [{event, connected}],
|
{reply, ok, [{event, connected}],
|
||||||
ensure_connected(Channel1#channel{clientinfo = NClientInfo})};
|
ensure_connected(Channel1#channel{clientinfo = NClientInfo})};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~ts (Username: '~ts') open session failed for ~0p",
|
?SLOG(warning, #{ msg => "client_login_failed"
|
||||||
[ClientId, Username, Reason]),
|
, clientid => ClientId
|
||||||
|
, username => Username
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
|
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
|
?SLOG(warning, #{ msg => "client_login_failed"
|
||||||
[ClientId, Username, Reason]),
|
, clientid => ClientId
|
||||||
|
, username => Username
|
||||||
|
, reason => Reason}),
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
|
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -363,7 +372,9 @@ handle_call(kick, _From, Channel) ->
|
||||||
{shutdown, kicked, ok, Channel};
|
{shutdown, kicked, ok, Channel};
|
||||||
|
|
||||||
handle_call(Req, _From, Channel) ->
|
handle_call(Req, _From, Channel) ->
|
||||||
?LOG(warning, "Unexpected call: ~p", [Req]),
|
?SLOG(warning, #{ msg => "unexpected_call"
|
||||||
|
, call => Req
|
||||||
|
}),
|
||||||
{reply, {error, unexpected_call}, Channel}.
|
{reply, {error, unexpected_call}, Channel}.
|
||||||
|
|
||||||
-spec handle_cast(any(), channel())
|
-spec handle_cast(any(), channel())
|
||||||
|
@ -371,7 +382,9 @@ handle_call(Req, _From, Channel) ->
|
||||||
| {ok, replies(), channel()}
|
| {ok, replies(), channel()}
|
||||||
| {shutdown, Reason :: term(), channel()}.
|
| {shutdown, Reason :: term(), channel()}.
|
||||||
handle_cast(Req, Channel) ->
|
handle_cast(Req, Channel) ->
|
||||||
?WARN("Unexpected call: ~p", [Req]),
|
?SLOG(warning, #{ msg => "unexpected_call"
|
||||||
|
, call => Req
|
||||||
|
}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
-spec handle_info(any(), channel())
|
-spec handle_info(any(), channel())
|
||||||
|
@ -403,7 +416,9 @@ handle_info({hreply, FunName, {error, Reason}}, Channel) ->
|
||||||
{shutdown, {error, {FunName, Reason}}, Channel};
|
{shutdown, {error, {FunName, Reason}}, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?LOG(warning, "Unexpected info: ~p", [Info]),
|
?SLOG(warning, #{ msg => "unexpected_info"
|
||||||
|
, info => Info
|
||||||
|
}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
-spec terminate(any(), channel()) -> channel().
|
-spec terminate(any(), channel()) -> channel().
|
||||||
|
|
|
@ -82,32 +82,42 @@ handle_call(_Request, _From, State) ->
|
||||||
handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) ->
|
handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) ->
|
||||||
case ensure_stream_opened(Fun, Options, Streams) of
|
case ensure_stream_opened(Fun, Options, Streams) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "CALL ~0p:~0p(~0p) failed, reason: ~0p",
|
?SLOG(error, #{ msg => "request_grpc_server_failed"
|
||||||
[?CONN_ADAPTER_MOD, Fun, Options, Reason]),
|
, function => {?CONN_ADAPTER_MOD, Fun, Options}
|
||||||
|
, reason => Reason}),
|
||||||
reply(From, Fun, {error, Reason}),
|
reply(From, Fun, {error, Reason}),
|
||||||
{noreply, State#state{streams = Streams#{Fun => undefined}}};
|
{noreply, State#state{streams = Streams#{Fun => undefined}}};
|
||||||
{ok, Stream} ->
|
{ok, Stream} ->
|
||||||
case catch grpc_client:send(Stream, Req) of
|
case catch grpc_client:send(Stream, Req) of
|
||||||
ok ->
|
ok ->
|
||||||
?LOG(debug, "Send to ~p method successfully, request: ~0p",
|
?SLOG(debug, #{ msg => "send_grpc_request_succeed"
|
||||||
[Fun, Req]),
|
, function => {?CONN_ADAPTER_MOD, Fun}
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
reply(From, Fun, ok),
|
reply(From, Fun, ok),
|
||||||
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
||||||
{'EXIT', {not_found, _Stk}} ->
|
{'EXIT', {not_found, _Stk}} ->
|
||||||
%% Not found the stream, reopen it
|
%% Not found the stream, reopen it
|
||||||
?LOG(info, "Can not find the old stream ref for ~s; "
|
?SLOG(info, #{ msg => "cannt_find_old_stream_ref"
|
||||||
"re-try with a new stream!", [Fun]),
|
, function => {?CONN_ADAPTER_MOD, Fun}
|
||||||
|
}),
|
||||||
handle_cast(
|
handle_cast(
|
||||||
{rpc, Fun, Req, Options, From},
|
{rpc, Fun, Req, Options, From},
|
||||||
State#state{streams = maps:remove(Fun, Streams)});
|
State#state{streams = maps:remove(Fun, Streams)});
|
||||||
{'EXIT', {timeout, _Stk}} ->
|
{'EXIT', {timeout, _Stk}} ->
|
||||||
?LOG(error, "Send to ~p method timeout, request: ~0p",
|
?SLOG(error, #{ msg => "send_grpc_request_timeout"
|
||||||
[Fun, Req]),
|
, function => {?CONN_ADAPTER_MOD, Fun}
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
reply(From, Fun, {error, timeout}),
|
reply(From, Fun, {error, timeout}),
|
||||||
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
{noreply, State#state{streams = Streams#{Fun => Stream}}};
|
||||||
{'EXIT', {Reason1, _Stk}} ->
|
{'EXIT', {Reason1, Stk}} ->
|
||||||
?LOG(error, "Send to ~p method failure, request: ~0p, "
|
?SLOG(error, #{ msg => "send_grpc_request_failed"
|
||||||
"stacktrace: ~0p", [Fun, Req, _Stk]),
|
, function => {?CONN_ADAPTER_MOD, Fun}
|
||||||
|
, request => Req
|
||||||
|
, error => Reason1
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
reply(From, Fun, {error, Reason1}),
|
reply(From, Fun, {error, Reason1}),
|
||||||
{noreply, State#state{streams = Streams#{Fun => undefined}}}
|
{noreply, State#state{streams = Streams#{Fun => undefined}}}
|
||||||
end
|
end
|
||||||
|
|
|
@ -44,14 +44,20 @@
|
||||||
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
|
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
|
||||||
| {error, grpc_cowboy_h:error_response()}.
|
| {error, grpc_cowboy_h:error_response()}.
|
||||||
send(Req = #{conn := Conn, bytes := Bytes}, Md) ->
|
send(Req = #{conn := Conn, bytes := Bytes}, Md) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
{ok, response(call(Conn, {send, Bytes})), Md}.
|
{ok, response(call(Conn, {send, Bytes})), Md}.
|
||||||
|
|
||||||
-spec close(emqx_exproto_pb:close_socket_request(), grpc:metadata())
|
-spec close(emqx_exproto_pb:close_socket_request(), grpc:metadata())
|
||||||
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
|
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
|
||||||
| {error, grpc_cowboy_h:error_response()}.
|
| {error, grpc_cowboy_h:error_response()}.
|
||||||
close(Req = #{conn := Conn}, Md) ->
|
close(Req = #{conn := Conn}, Md) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
{ok, response(call(Conn, close)), Md}.
|
{ok, response(call(Conn, close)), Md}.
|
||||||
|
|
||||||
-spec authenticate(emqx_exproto_pb:authenticate_request(), grpc:metadata())
|
-spec authenticate(emqx_exproto_pb:authenticate_request(), grpc:metadata())
|
||||||
|
@ -60,7 +66,10 @@ close(Req = #{conn := Conn}, Md) ->
|
||||||
authenticate(Req = #{conn := Conn,
|
authenticate(Req = #{conn := Conn,
|
||||||
password := Password,
|
password := Password,
|
||||||
clientinfo := ClientInfo}, Md) ->
|
clientinfo := ClientInfo}, Md) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
case validate(clientinfo, ClientInfo) of
|
case validate(clientinfo, ClientInfo) of
|
||||||
false ->
|
false ->
|
||||||
{ok, response({error, ?RESP_REQUIRED_PARAMS_MISSED}), Md};
|
{ok, response({error, ?RESP_REQUIRED_PARAMS_MISSED}), Md};
|
||||||
|
@ -73,10 +82,18 @@ authenticate(Req = #{conn := Conn,
|
||||||
| {error, grpc_cowboy_h:error_response()}.
|
| {error, grpc_cowboy_h:error_response()}.
|
||||||
start_timer(Req = #{conn := Conn, type := Type, interval := Interval}, Md)
|
start_timer(Req = #{conn := Conn, type := Type, interval := Interval}, Md)
|
||||||
when Type =:= 'KEEPALIVE' andalso Interval > 0 ->
|
when Type =:= 'KEEPALIVE' andalso Interval > 0 ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
|
|
||||||
{ok, response(call(Conn, {start_timer, keepalive, Interval})), Md};
|
{ok, response(call(Conn, {start_timer, keepalive, Interval})), Md};
|
||||||
start_timer(Req, Md) ->
|
start_timer(Req, Md) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
|
|
||||||
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
|
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
|
||||||
|
|
||||||
-spec publish(emqx_exproto_pb:publish_request(), grpc:metadata())
|
-spec publish(emqx_exproto_pb:publish_request(), grpc:metadata())
|
||||||
|
@ -84,11 +101,18 @@ start_timer(Req, Md) ->
|
||||||
| {error, grpc_cowboy_h:error_response()}.
|
| {error, grpc_cowboy_h:error_response()}.
|
||||||
publish(Req = #{conn := Conn, topic := Topic, qos := Qos, payload := Payload}, Md)
|
publish(Req = #{conn := Conn, topic := Topic, qos := Qos, payload := Payload}, Md)
|
||||||
when ?IS_QOS(Qos) ->
|
when ?IS_QOS(Qos) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
|
|
||||||
{ok, response(call(Conn, {publish, Topic, Qos, Payload})), Md};
|
{ok, response(call(Conn, {publish, Topic, Qos, Payload})), Md};
|
||||||
|
|
||||||
publish(Req, Md) ->
|
publish(Req, Md) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
|
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
|
||||||
|
|
||||||
-spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata())
|
-spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata())
|
||||||
|
@ -96,18 +120,27 @@ publish(Req, Md) ->
|
||||||
| {error, grpc_cowboy_h:error_response()}.
|
| {error, grpc_cowboy_h:error_response()}.
|
||||||
subscribe(Req = #{conn := Conn, topic := Topic, qos := Qos}, Md)
|
subscribe(Req = #{conn := Conn, topic := Topic, qos := Qos}, Md)
|
||||||
when ?IS_QOS(Qos) ->
|
when ?IS_QOS(Qos) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
{ok, response(call(Conn, {subscribe_from_client, Topic, Qos})), Md};
|
{ok, response(call(Conn, {subscribe_from_client, Topic, Qos})), Md};
|
||||||
|
|
||||||
subscribe(Req, Md) ->
|
subscribe(Req, Md) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
|
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
|
||||||
|
|
||||||
-spec unsubscribe(emqx_exproto_pb:unsubscribe_request(), grpc:metadata())
|
-spec unsubscribe(emqx_exproto_pb:unsubscribe_request(), grpc:metadata())
|
||||||
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
|
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
|
||||||
| {error, grpc_cowboy_h:error_response()}.
|
| {error, grpc_cowboy_h:error_response()}.
|
||||||
unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) ->
|
unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) ->
|
||||||
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
|
?SLOG(debug, #{ msg => "recv_grpc_function_call"
|
||||||
|
, function => ?FUNCTION_NAME
|
||||||
|
, request => Req
|
||||||
|
}),
|
||||||
{ok, response(call(Conn, {unsubscribe_from_client, Topic})), Md}.
|
{ok, response(call(Conn, {unsubscribe_from_client, Topic})), Md}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -130,9 +163,12 @@ call(ConnStr, Req) ->
|
||||||
exit : timeout ->
|
exit : timeout ->
|
||||||
{error, ?RESP_UNKNOWN, <<"Connection is not answered">>};
|
{error, ?RESP_UNKNOWN, <<"Connection is not answered">>};
|
||||||
Class : Reason : Stk->
|
Class : Reason : Stk->
|
||||||
?LOG(error, "Call ~p crashed: {~0p, ~0p}, "
|
?SLOG(error, #{ msg => "call_conn_process_crashed"
|
||||||
"stacktrace: ~0p",
|
, request => Req
|
||||||
[Class, Reason, Stk]),
|
, conn_str=> ConnStr
|
||||||
|
, reason => {Class, Reason}
|
||||||
|
, stacktrace => Stk
|
||||||
|
}),
|
||||||
{error, ?RESP_UNKNOWN, <<"Unkwown crashs">>}
|
{error, ?RESP_UNKNOWN, <<"Unkwown crashs">>}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -172,14 +172,18 @@ handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Chann
|
||||||
{reply, {ok, Result}, Channel};
|
{reply, {ok, Result}, Channel};
|
||||||
|
|
||||||
handle_call(Req, _From, Channel) ->
|
handle_call(Req, _From, Channel) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?SLOG(error, #{ msg => "unexpected_call"
|
||||||
|
, call => Req
|
||||||
|
}),
|
||||||
{reply, ignored, Channel}.
|
{reply, ignored, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Cast
|
%% Handle Cast
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
handle_cast(Req, Channel) ->
|
handle_cast(Req, Channel) ->
|
||||||
?LOG(error, "Unexpected cast: ~p", [Req]),
|
?SLOG(error, #{ msg => "unexpected_cast"
|
||||||
|
, cast => Req
|
||||||
|
}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -190,7 +194,9 @@ handle_info({subscribe, _AutoSubs}, Channel) ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?SLOG(error, #{ msg => "unexpected_info"
|
||||||
|
, info => Info
|
||||||
|
}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -283,7 +289,9 @@ check_lwm2m_version(#coap_message{options = Opts},
|
||||||
},
|
},
|
||||||
{ok, Channel#channel{conninfo = NConnInfo}};
|
{ok, Channel#channel{conninfo = NConnInfo}};
|
||||||
true ->
|
true ->
|
||||||
?LOG(error, "Reject REGISTER due to unsupported version: ~0p", [Ver]),
|
?SLOG(error, #{ msg => "reject_REGISTRE_request"
|
||||||
|
, reason => {unsupported_version, Ver}
|
||||||
|
}),
|
||||||
{error, "invalid lwm2m version", Channel}
|
{error, "invalid lwm2m version", Channel}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -313,7 +321,9 @@ enrich_clientinfo(#coap_message{options = Options} = Msg,
|
||||||
{ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
|
{ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
|
||||||
{ok, Channel#channel{clientinfo = NClientInfo}};
|
{ok, Channel#channel{clientinfo = NClientInfo}};
|
||||||
_ ->
|
_ ->
|
||||||
?LOG(error, "Reject REGISTER due to wrong parameters, Query=~p", [Query]),
|
?SLOG(error, #{ msg => "reject_REGISTER_request"
|
||||||
|
, reason => {wrong_paramters, Query}
|
||||||
|
}),
|
||||||
{error, "invalid queries", Channel}
|
{error, "invalid queries", Channel}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -329,8 +339,11 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
|
||||||
{ok, Channel#channel{clientinfo = NClientInfo,
|
{ok, Channel#channel{clientinfo = NClientInfo,
|
||||||
with_context = with_context(Ctx, ClientInfo)}};
|
with_context = with_context(Ctx, ClientInfo)}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
|
?SLOG(warning, #{ msg => "client_login_failed"
|
||||||
[ClientId, Username, Reason]),
|
, clientid => ClientId
|
||||||
|
, username => Username
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -374,7 +387,9 @@ process_connect(Channel = #channel{ctx = Ctx,
|
||||||
NewResult1 = NewResult0#{events => [{event, connected}]},
|
NewResult1 = NewResult0#{events => [{event, connected}]},
|
||||||
iter(Iter, maps:merge(Result, NewResult1), Channel);
|
iter(Iter, maps:merge(Result, NewResult1), Channel);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to open session du to ~p", [Reason]),
|
?SLOG(error, #{ msg => "falied_to_open_session"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
|
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -398,17 +413,24 @@ with_context(publish, [Topic, Msg], Ctx, ClientInfo) ->
|
||||||
allow ->
|
allow ->
|
||||||
emqx:publish(Msg);
|
emqx:publish(Msg);
|
||||||
_ ->
|
_ ->
|
||||||
?LOG(error, "topic:~p not allow to publish ", [Topic])
|
?SLOG(error, #{ msg => "publish_denied"
|
||||||
|
, topic => Topic
|
||||||
|
})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) ->
|
with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
|
||||||
allow ->
|
allow ->
|
||||||
run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]),
|
run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]),
|
||||||
?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, Username]),
|
?SLOG(debug, #{ msg => "subscribe_topic_succeed"
|
||||||
|
, topic => Topic
|
||||||
|
, endpoint_name => Username
|
||||||
|
}),
|
||||||
emqx:subscribe(Topic, Username, Opts);
|
emqx:subscribe(Topic, Username, Opts);
|
||||||
_ ->
|
_ ->
|
||||||
?LOG(error, "Topic: ~0p not allow to subscribe", [Topic])
|
?SLOG(error, #{ msg => "subscribe_denied"
|
||||||
|
, topic => Topic
|
||||||
|
})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
with_context(metrics, Name, Ctx, _ClientInfo) ->
|
with_context(metrics, Name, Ctx, _ClientInfo) ->
|
||||||
|
|
|
@ -168,7 +168,10 @@ read_resp_to_mqtt({ok, SuccessCode}, CoapPayload, Format, Ref) ->
|
||||||
catch
|
catch
|
||||||
error:not_implemented -> make_response(not_implemented, Ref);
|
error:not_implemented -> make_response(not_implemented, Ref);
|
||||||
_:Ex:_ST ->
|
_:Ex:_ST ->
|
||||||
?LOG(error, "~0p, bad payload format: ~0p", [Ex, CoapPayload]),
|
?SLOG(error, #{ msg => "bad_payload_format"
|
||||||
|
, payload => CoapPayload
|
||||||
|
, reason => Ex
|
||||||
|
, stacktrace => _ST}),
|
||||||
make_response(bad_request, Ref)
|
make_response(bad_request, Ref)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,6 @@
|
||||||
|
|
||||||
-include("src/lwm2m/include/emqx_lwm2m.hrl").
|
-include("src/lwm2m/include/emqx_lwm2m.hrl").
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args), logger:Level("LWM2M-JSON: " ++ Format, Args)).
|
|
||||||
|
|
||||||
tlv_to_json(BaseName, TlvData) ->
|
tlv_to_json(BaseName, TlvData) ->
|
||||||
DecodedTlv = emqx_lwm2m_tlv:parse(TlvData),
|
DecodedTlv = emqx_lwm2m_tlv:parse(TlvData),
|
||||||
ObjectId = object_id(BaseName),
|
ObjectId = object_id(BaseName),
|
||||||
|
|
|
@ -405,7 +405,7 @@ send_auto_observe(RegInfo, Session) ->
|
||||||
ObjectList = maps:get(<<"objectList">>, RegInfo, []),
|
ObjectList = maps:get(<<"objectList">>, RegInfo, []),
|
||||||
observe_object_list(AlternatePath, ObjectList, Session);
|
observe_object_list(AlternatePath, ObjectList, Session);
|
||||||
_ ->
|
_ ->
|
||||||
?LOG(info, "Auto Observe Disabled", []),
|
?SLOG(info, #{ msg => "skip_auto_observe_due_to_disabled"}),
|
||||||
Session
|
Session
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -435,7 +435,10 @@ observe_object(AlternatePath, ObjectPath, Session) ->
|
||||||
deliver_auto_observe_to_coap(AlternatePath, Payload, Session).
|
deliver_auto_observe_to_coap(AlternatePath, Payload, Session).
|
||||||
|
|
||||||
deliver_auto_observe_to_coap(AlternatePath, TermData, Session) ->
|
deliver_auto_observe_to_coap(AlternatePath, TermData, Session) ->
|
||||||
?LOG(info, "Auto Observe, SEND To CoAP, AlternatePath=~0p, Data=~0p ", [AlternatePath, TermData]),
|
?SLOG(info, #{ msg => "send_auto_observe"
|
||||||
|
, path => AlternatePath
|
||||||
|
, data => TermData
|
||||||
|
}),
|
||||||
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
|
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
|
||||||
maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
|
maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
|
||||||
|
|
||||||
|
@ -568,11 +571,15 @@ send_to_coap(#session{queue = Queue} = Session) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
send_to_coap(Ctx, Req, Session) ->
|
send_to_coap(Ctx, Req, Session) ->
|
||||||
?LOG(debug, "Deliver To CoAP, CoapRequest: ~0p", [Req]),
|
?SLOG(debug, #{ msg => "deliver_to_coap"
|
||||||
|
, coap_request => Req
|
||||||
|
}),
|
||||||
out_to_coap(Ctx, Req, Session#session{wait_ack = Ctx}).
|
out_to_coap(Ctx, Req, Session#session{wait_ack = Ctx}).
|
||||||
|
|
||||||
send_msg_not_waiting_ack(Ctx, Req, Session) ->
|
send_msg_not_waiting_ack(Ctx, Req, Session) ->
|
||||||
?LOG(debug, "Deliver To CoAP not waiting ack, CoapRequest: ~0p", [Req]),
|
?SLOG(debug, #{ msg => "deliver_to_coap_and_no_ack"
|
||||||
|
, coap_request => Req
|
||||||
|
}),
|
||||||
%% cmd_sent(Ref, LwM2MOpts).
|
%% cmd_sent(Ref, LwM2MOpts).
|
||||||
out_to_coap(Ctx, Req, Session).
|
out_to_coap(Ctx, Req, Session).
|
||||||
|
|
||||||
|
@ -636,8 +643,11 @@ deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, WithContext, Session)
|
||||||
deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session)
|
deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session)
|
||||||
catch
|
catch
|
||||||
ExClass:Error:ST ->
|
ExClass:Error:ST ->
|
||||||
?LOG(error, "deliver_to_coap - Invalid JSON: ~0p, Exception: ~0p, stacktrace: ~0p",
|
?SLOG(error, #{ msg => "invaild_json_format_to_deliver"
|
||||||
[JsonData, {ExClass, Error}, ST]),
|
, data => JsonData
|
||||||
|
, reason => {ExClass, Error}
|
||||||
|
, stacktrace => ST
|
||||||
|
}),
|
||||||
WithContext(metrics, 'delivery.dropped'),
|
WithContext(metrics, 'delivery.dropped'),
|
||||||
Session
|
Session
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -27,8 +27,6 @@
|
||||||
|
|
||||||
-include("src/lwm2m/include/emqx_lwm2m.hrl").
|
-include("src/lwm2m/include/emqx_lwm2m.hrl").
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)).
|
|
||||||
|
|
||||||
-define(TLV_TYPE_OBJECT_INSTANCE, 0).
|
-define(TLV_TYPE_OBJECT_INSTANCE, 0).
|
||||||
-define(TLV_TYPE_RESOURCE_INSTANCE, 1).
|
-define(TLV_TYPE_RESOURCE_INSTANCE, 1).
|
||||||
-define(TLV_TYPE_MULTIPLE_RESOURCE, 2).
|
-define(TLV_TYPE_MULTIPLE_RESOURCE, 2).
|
||||||
|
@ -39,8 +37,6 @@
|
||||||
-define(TLV_LEGNTH_16_BIT, 2).
|
-define(TLV_LEGNTH_16_BIT, 2).
|
||||||
-define(TLV_LEGNTH_24_BIT, 3).
|
-define(TLV_LEGNTH_24_BIT, 3).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%----------------------------------------------------------------------------------------------------------------------------------------
|
%----------------------------------------------------------------------------------------------------------------------------------------
|
||||||
% [#{tlv_object_instance := Id11, value := Value11}, #{tlv_object_instance := Id12, value := Value12}, ...]
|
% [#{tlv_object_instance := Id11, value := Value11}, #{tlv_object_instance := Id12, value := Value12}, ...]
|
||||||
% where Value11 and Value12 is a list:
|
% where Value11 and Value12 is a list:
|
||||||
|
|
|
@ -28,9 +28,6 @@
|
||||||
, get_resource_operations/2
|
, get_resource_operations/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args),
|
|
||||||
logger:Level("LWM2M-OBJ: " ++ Format, Args)).
|
|
||||||
|
|
||||||
% This module is for future use. Disabled now.
|
% This module is for future use. Disabled now.
|
||||||
|
|
||||||
get_obj_def(ObjectIdInt, true) ->
|
get_obj_def(ObjectIdInt, true) ->
|
||||||
|
@ -50,7 +47,6 @@ get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
|
||||||
ResourceNameString = binary_to_list(ResourceNameBinary),
|
ResourceNameString = binary_to_list(ResourceNameBinary),
|
||||||
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
|
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
|
||||||
[#xmlAttribute{value=ResourceId}] = xmerl_xpath:string("Resources/Item/Name[.=\""++ResourceNameString++"\"]/../@ID", ObjDefinition),
|
[#xmlAttribute{value=ResourceId}] = xmerl_xpath:string("Resources/Item/Name[.=\""++ResourceNameString++"\"]/../@ID", ObjDefinition),
|
||||||
?LOG(debug, "get_object_and_resource_id ObjectId=~p, ResourceId=~p", [ObjectId, ResourceId]),
|
|
||||||
{ObjectId, ResourceId}.
|
{ObjectId, ResourceId}.
|
||||||
|
|
||||||
get_resource_type(ResourceIdInt, ObjDefinition) ->
|
get_resource_type(ResourceIdInt, ObjDefinition) ->
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
-include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl").
|
-include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl").
|
||||||
-include_lib("xmerl/include/xmerl.hrl").
|
-include_lib("xmerl/include/xmerl.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
% This module is for future use. Disabled now.
|
% This module is for future use. Disabled now.
|
||||||
|
|
||||||
|
@ -37,9 +38,6 @@
|
||||||
, code_change/3
|
, code_change/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args),
|
|
||||||
logger:Level("LWM2M-OBJ-DB: " ++ Format, Args)).
|
|
||||||
|
|
||||||
-define(LWM2M_OBJECT_DEF_TAB, lwm2m_object_def_tab).
|
-define(LWM2M_OBJECT_DEF_TAB, lwm2m_object_def_tab).
|
||||||
-define(LWM2M_OBJECT_NAME_TO_ID_TAB, lwm2m_object_name_to_id_tab).
|
-define(LWM2M_OBJECT_NAME_TO_ID_TAB, lwm2m_object_name_to_id_tab).
|
||||||
|
|
||||||
|
@ -130,7 +128,11 @@ load_loop([FileName|T]) ->
|
||||||
[#xmlText{value=Name}] = xmerl_xpath:string("Name/text()", ObjectXml),
|
[#xmlText{value=Name}] = xmerl_xpath:string("Name/text()", ObjectXml),
|
||||||
ObjectId = list_to_integer(ObjectIdString),
|
ObjectId = list_to_integer(ObjectIdString),
|
||||||
NameBinary = list_to_binary(Name),
|
NameBinary = list_to_binary(Name),
|
||||||
?LOG(debug, "load_loop FileName=~p, ObjectId=~p, Name=~p", [FileName, ObjectId, NameBinary]),
|
?SLOG(debug, #{ msg => "load_object_succeed"
|
||||||
|
, filename => FileName
|
||||||
|
, object_id => ObjectId
|
||||||
|
, object_name => NameBinary
|
||||||
|
}),
|
||||||
ets:insert(?LWM2M_OBJECT_DEF_TAB, {ObjectId, ObjectXml}),
|
ets:insert(?LWM2M_OBJECT_DEF_TAB, {ObjectId, ObjectXml}),
|
||||||
ets:insert(?LWM2M_OBJECT_NAME_TO_ID_TAB, {NameBinary, ObjectId}),
|
ets:insert(?LWM2M_OBJECT_NAME_TO_ID_TAB, {NameBinary, ObjectId}),
|
||||||
load_loop(T).
|
load_loop(T).
|
||||||
|
|
|
@ -57,18 +57,24 @@ init([GwId, Port]) ->
|
||||||
sock = Sock, port = Port, duration = Duration})}.
|
sock = Sock, port = Port, duration = Duration})}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "Unexpected request: ~p", [Req]),
|
?SLOG(error, #{ msg => "unexpected_call"
|
||||||
|
, call => Req
|
||||||
|
}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?LOG(error, "Unexpected msg: ~p", [Msg]),
|
?SLOG(error, #{ msg => "unexpected_cast"
|
||||||
|
, cast => Msg
|
||||||
|
}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(broadcast_advertise, State) ->
|
handle_info(broadcast_advertise, State) ->
|
||||||
{noreply, ensure_advertise(State), hibernate};
|
{noreply, ensure_advertise(State), hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?SLOG(error, #{ msg => "unexpected_info"
|
||||||
|
, info => Info
|
||||||
|
}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #state{tref = Timer}) ->
|
terminate(_Reason, #state{tref = Timer}) ->
|
||||||
|
@ -90,7 +96,9 @@ send_advertise(#state{gwid = GwId, sock = Sock, port = Port,
|
||||||
addrs = Addrs, duration = Duration}) ->
|
addrs = Addrs, duration = Duration}) ->
|
||||||
Data = emqx_sn_frame:serialize_pkt(?SN_ADVERTISE_MSG(GwId, Duration), #{}),
|
Data = emqx_sn_frame:serialize_pkt(?SN_ADVERTISE_MSG(GwId, Duration), #{}),
|
||||||
lists:foreach(fun(Addr) ->
|
lists:foreach(fun(Addr) ->
|
||||||
?LOG(debug, "SEND SN_ADVERTISE to ~p~n", [Addr]),
|
?SLOG(debug, #{ msg => "send_ADVERTISE_msg"
|
||||||
|
, address => Addr
|
||||||
|
}),
|
||||||
gen_udp:send(Sock, Addr, Port, Data)
|
gen_udp:send(Sock, Addr, Port, Data)
|
||||||
end, Addrs).
|
end, Addrs).
|
||||||
|
|
||||||
|
|
|
@ -287,8 +287,11 @@ auth_connect(_Packet, Channel = #channel{ctx = Ctx,
|
||||||
{ok, NClientInfo} ->
|
{ok, NClientInfo} ->
|
||||||
{ok, Channel#channel{clientinfo = NClientInfo}};
|
{ok, Channel#channel{clientinfo = NClientInfo}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
|
?SLOG(warning, #{ msg => "client_login_failed"
|
||||||
[ClientId, Username, Reason]),
|
, clientid => ClientId
|
||||||
|
, username => Username
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
%% FIXME: ReasonCode?
|
%% FIXME: ReasonCode?
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -321,7 +324,9 @@ process_connect(Channel = #channel{
|
||||||
handle_out(connack, ?SN_RC_ACCEPTED,
|
handle_out(connack, ?SN_RC_ACCEPTED,
|
||||||
Channel#channel{session = Session});
|
Channel#channel{session = Session});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to open session due to ~p", [Reason]),
|
?SLOG(error, #{ msg => "failed_to_open_session"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
handle_out(connack, ?SN_RC_FAILED_SESSION, Channel)
|
handle_out(connack, ?SN_RC_FAILED_SESSION, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -383,19 +388,24 @@ handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!",
|
?SLOG(debug, #{ msg => "receive_qo3_message_in_idle_mode"
|
||||||
[?NEG_QOS_CLIENT_ID]),
|
, topic => TopicName
|
||||||
|
, data => Data
|
||||||
|
}),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_in(Pkt = #mqtt_sn_message{type = Type},
|
handle_in(Pkt = #mqtt_sn_message{type = Type},
|
||||||
Channel = #channel{conn_state = idle})
|
Channel = #channel{conn_state = idle})
|
||||||
when Type /= ?SN_CONNECT ->
|
when Type /= ?SN_CONNECT ->
|
||||||
?LOG(warning, "Receive unknown packet ~0p in idle state", [Pkt]),
|
?SLOG(warning, #{ msg => "receive_unknown_packet_in_idle_state"
|
||||||
|
, packet => Pkt
|
||||||
|
}),
|
||||||
shutdown(normal, Channel);
|
shutdown(normal, Channel);
|
||||||
|
|
||||||
handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
|
handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
|
||||||
Channel = #channel{conn_state = connecting}) ->
|
Channel = #channel{conn_state = connecting}) ->
|
||||||
?LOG(warning, "Receive connect packet in connecting state"),
|
?SLOG(warning, #{ msg => "receive_connect_packet_in_connecting_state"
|
||||||
|
}),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
|
handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
|
||||||
|
@ -461,12 +471,17 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
|
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
|
||||||
TopicId when is_integer(TopicId) ->
|
TopicId when is_integer(TopicId) ->
|
||||||
?LOG(debug, "register TopicName=~p, TopicId=~p",
|
?SLOG(debug, #{ msg => "registered_topic_name"
|
||||||
[TopicName, TopicId]),
|
, topic_name => TopicName
|
||||||
|
, topic_id => TopicId
|
||||||
|
}),
|
||||||
AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED),
|
AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED),
|
||||||
{ok, {outgoing, AckPacket}, Channel};
|
{ok, {outgoing, AckPacket}, Channel};
|
||||||
{error, too_large} ->
|
{error, too_large} ->
|
||||||
?LOG(error, "TopicId is full! TopicName=~p", [TopicName]),
|
?SLOG(error, #{ msg => "register_topic_failed"
|
||||||
|
, topic_name => TopicName
|
||||||
|
, reason => topic_id_fulled
|
||||||
|
}),
|
||||||
AckPacket = ?SN_REGACK_MSG(
|
AckPacket = ?SN_REGACK_MSG(
|
||||||
?SN_INVALID_TOPIC_ID,
|
?SN_INVALID_TOPIC_ID,
|
||||||
MsgId,
|
MsgId,
|
||||||
|
@ -474,8 +489,10 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
|
||||||
),
|
),
|
||||||
{ok, {outgoing, AckPacket}, Channel};
|
{ok, {outgoing, AckPacket}, Channel};
|
||||||
{error, wildcard_topic} ->
|
{error, wildcard_topic} ->
|
||||||
?LOG(error, "wildcard topic can not be registered! TopicName=~p",
|
?SLOG(error, #{ msg => "register_topic_failed"
|
||||||
[TopicName]),
|
, topic_name => TopicName
|
||||||
|
, reason => not_support_wildcard_topic
|
||||||
|
}),
|
||||||
AckPacket = ?SN_REGACK_MSG(
|
AckPacket = ?SN_REGACK_MSG(
|
||||||
?SN_INVALID_TOPIC_ID,
|
?SN_INVALID_TOPIC_ID,
|
||||||
MsgId,
|
MsgId,
|
||||||
|
@ -520,13 +537,17 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
|
||||||
Publishes,
|
Publishes,
|
||||||
Channel#channel{session = NSession});
|
Channel#channel{session = NSession});
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?LOG(warning, "The PUBACK MsgId ~w is inuse.",
|
?SLOG(warning, #{ msg => "commit_puback_failed"
|
||||||
[MsgId]),
|
, msg_id => MsgId
|
||||||
|
, reason => msg_id_inused
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.puback.inuse'),
|
ok = metrics_inc(Ctx, 'packets.puback.inuse'),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||||
?LOG(warning, "The PUBACK MsgId ~w is not found.",
|
?SLOG(warning, #{ msg => "commit_puback_failed"
|
||||||
[MsgId]),
|
, msg_id => MsgId
|
||||||
|
, reason => not_found
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.puback.missed'),
|
ok = metrics_inc(Ctx, 'packets.puback.missed'),
|
||||||
{ok, Channel}
|
{ok, Channel}
|
||||||
end;
|
end;
|
||||||
|
@ -543,7 +564,9 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
|
||||||
{ok, {outgoing, RegPkt}, Channel}
|
{ok, {outgoing, RegPkt}, Channel}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
|
?SLOG(error, #{ msg => "cannt_handle_PUBACK"
|
||||||
|
, return_code => ReturnCode
|
||||||
|
}),
|
||||||
{ok, Channel}
|
{ok, Channel}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -557,11 +580,17 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = Channel#channel{session = NSession},
|
||||||
handle_out(pubrel, MsgId, NChannel);
|
handle_out(pubrel, MsgId, NChannel);
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?LOG(warning, "The PUBREC MsgId ~w is inuse.", [MsgId]),
|
?SLOG(warning, #{ msg => "commit_PUBREC_failed"
|
||||||
|
, msg_id => MsgId
|
||||||
|
, reason => msg_id_inused
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.pubrec.inuse'),
|
ok = metrics_inc(Ctx, 'packets.pubrec.inuse'),
|
||||||
handle_out(pubrel, MsgId, Channel);
|
handle_out(pubrel, MsgId, Channel);
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||||
?LOG(warning, "The PUBREC ~w is not found.", [MsgId]),
|
?SLOG(warning, #{ msg => "commit_PUBREC_failed"
|
||||||
|
, msg_id => MsgId
|
||||||
|
, reason => not_found
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.pubrec.missed'),
|
ok = metrics_inc(Ctx, 'packets.pubrec.missed'),
|
||||||
handle_out(pubrel, MsgId, Channel)
|
handle_out(pubrel, MsgId, Channel)
|
||||||
end;
|
end;
|
||||||
|
@ -573,7 +602,10 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = Channel#channel{session = NSession},
|
||||||
handle_out(pubcomp, MsgId, NChannel);
|
handle_out(pubcomp, MsgId, NChannel);
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||||
?LOG(warning, "The PUBREL MsgId ~w is not found.", [MsgId]),
|
?SLOG(warning, #{ msg => "commit_PUBREL_failed"
|
||||||
|
, msg_id => MsgId
|
||||||
|
, reason => not_found
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.pubrel.missed'),
|
ok = metrics_inc(Ctx, 'packets.pubrel.missed'),
|
||||||
handle_out(pubcomp, MsgId, Channel)
|
handle_out(pubcomp, MsgId, Channel)
|
||||||
end;
|
end;
|
||||||
|
@ -587,10 +619,17 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
|
||||||
handle_out(publish, Publishes,
|
handle_out(publish, Publishes,
|
||||||
Channel#channel{session = NSession});
|
Channel#channel{session = NSession});
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
|
?SLOG(warning, #{ msg => "commit_PUBCOMP_failed"
|
||||||
|
, msg_id => MsgId
|
||||||
|
, reason => msg_id_inused
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.pubcomp.inuse'),
|
ok = metrics_inc(Ctx, 'packets.pubcomp.inuse'),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||||
?LOG(warning, "The PUBCOMP MsgId ~w is not found", [MsgId]),
|
?SLOG(warning, #{ msg => "commit_PUBCOMP_failed"
|
||||||
|
, msg_id => MsgId
|
||||||
|
, reason => not_found
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.pubcomp.missed'),
|
ok = metrics_inc(Ctx, 'packets.pubcomp.missed'),
|
||||||
{ok, Channel}
|
{ok, Channel}
|
||||||
end;
|
end;
|
||||||
|
@ -626,8 +665,10 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName),
|
||||||
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
|
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
|
||||||
{ok, outgoing_and_update(UnsubAck), NChannel};
|
{ok, outgoing_and_update(UnsubAck), NChannel};
|
||||||
{error, Reason, NChannel} ->
|
{error, Reason, NChannel} ->
|
||||||
?LOG(warning, "Unsubscribe ~p failed: ~0p",
|
?SLOG(warning, #{ msg => "unsubscribe_failed"
|
||||||
[TopicIdOrName, Reason]),
|
, topic => TopicIdOrName
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
%% XXX: Even if it fails, the reply is successful.
|
%% XXX: Even if it fails, the reply is successful.
|
||||||
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
|
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
|
||||||
{ok, {outgoing, UnsubAck}, NChannel}
|
{ok, {outgoing, UnsubAck}, NChannel}
|
||||||
|
@ -674,7 +715,9 @@ handle_in(?SN_WILLMSGUPD_MSG(Payload),
|
||||||
|
|
||||||
handle_in({frame_error, Reason},
|
handle_in({frame_error, Reason},
|
||||||
Channel = #channel{conn_state = _ConnState}) ->
|
Channel = #channel{conn_state = _ConnState}) ->
|
||||||
?LOG(error, "Unexpected frame error: ~p", [Reason]),
|
?SLOG(error, #{ msg => "unexpected_frame_error"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
shutdown(Reason, Channel).
|
shutdown(Reason, Channel).
|
||||||
|
|
||||||
after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) ->
|
after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) ->
|
||||||
|
@ -689,13 +732,15 @@ outgoing_and_update(Pkt) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Publish
|
%% Handle Publish
|
||||||
|
|
||||||
check_qos3_enable(?SN_PUBLISH_MSG(Flags, _, _, _),
|
check_qos3_enable(?SN_PUBLISH_MSG(Flags, TopicId, _MsgId, Data),
|
||||||
#channel{enable_qos3 = EnableQoS3}) ->
|
#channel{enable_qos3 = EnableQoS3}) ->
|
||||||
#mqtt_sn_flags{qos = QoS} = Flags,
|
#mqtt_sn_flags{qos = QoS} = Flags,
|
||||||
case EnableQoS3 =:= false andalso QoS =:= ?QOS_NEG1 of
|
case EnableQoS3 =:= false andalso QoS =:= ?QOS_NEG1 of
|
||||||
true ->
|
true ->
|
||||||
?LOG(debug, "The enable_qos3 is false, ignore the received "
|
?SLOG(debug, #{ msg => "ignore_msg_due_to_qos3_disabled"
|
||||||
"publish with QoS=-1 in connected mode!"),
|
, topic_id => TopicId
|
||||||
|
, data => Data
|
||||||
|
}),
|
||||||
{error, ?SN_RC_NOT_SUPPORTED};
|
{error, ?SN_RC_NOT_SUPPORTED};
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
|
@ -781,8 +826,9 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2},
|
||||||
handle_out(puback , {TopicId, MsgId, ?SN_RC_NOT_SUPPORTED},
|
handle_out(puback , {TopicId, MsgId, ?SN_RC_NOT_SUPPORTED},
|
||||||
Channel);
|
Channel);
|
||||||
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
||||||
?LOG(warning, "Dropped the qos2 packet ~w "
|
?SLOG(warning, #{ msg => "dropped_the_qos2_packet_due_to_awaiting_rel_full"
|
||||||
"due to awaiting_rel is full.", [MsgId]),
|
, msg_id => MsgId
|
||||||
|
}),
|
||||||
ok = metrics_inc(Ctx, 'packets.publish.dropped'),
|
ok = metrics_inc(Ctx, 'packets.publish.dropped'),
|
||||||
handle_out(puback, {TopicId, MsgId, ?SN_RC_CONGESTION}, Channel)
|
handle_out(puback, {TopicId, MsgId, ?SN_RC_CONGESTION}, Channel)
|
||||||
end.
|
end.
|
||||||
|
@ -860,8 +906,10 @@ run_client_subs_hook({TopicId, TopicName, QoS},
|
||||||
case run_hooks(Ctx, 'client.subscribe',
|
case run_hooks(Ctx, 'client.subscribe',
|
||||||
[ClientInfo, #{}], TopicFilters) of
|
[ClientInfo, #{}], TopicFilters) of
|
||||||
[] ->
|
[] ->
|
||||||
?LOG(warning, "Skip to subscribe ~ts, "
|
?SLOG(warning, #{ msg => "skip_to_subscribe"
|
||||||
"due to 'client.subscribe' denied!", [TopicName]),
|
, topic_name => TopicName
|
||||||
|
, reason => "'client.subscribe' filtered it"
|
||||||
|
}),
|
||||||
{error, ?SN_EXCEED_LIMITATION};
|
{error, ?SN_EXCEED_LIMITATION};
|
||||||
[{NTopicName, NSubOpts}|_] ->
|
[{NTopicName, NSubOpts}|_] ->
|
||||||
{ok, {TopicId, NTopicName, NSubOpts}, Channel}
|
{ok, {TopicId, NTopicName, NSubOpts}, Channel}
|
||||||
|
@ -879,8 +927,10 @@ do_subscribe({TopicId, TopicName, SubOpts},
|
||||||
{ok, {TopicId, NTopicName, NSubOpts},
|
{ok, {TopicId, NTopicName, NSubOpts},
|
||||||
Channel#channel{session = NSession}};
|
Channel#channel{session = NSession}};
|
||||||
{error, ?RC_QUOTA_EXCEEDED} ->
|
{error, ?RC_QUOTA_EXCEEDED} ->
|
||||||
?LOG(warning, "Cannot subscribe ~ts due to ~ts.",
|
?SLOG(warning, #{ msg => "cannt_subscribe_due_to_quota_exceeded"
|
||||||
[TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]),
|
, topic_name => TopicName
|
||||||
|
, reason => emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)
|
||||||
|
}),
|
||||||
{error, ?SN_EXCEED_LIMITATION}
|
{error, ?SN_EXCEED_LIMITATION}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -1185,7 +1235,9 @@ handle_call(discard, _From, Channel) ->
|
||||||
% reply(ok, Channel#channel{quota = Quota});
|
% reply(ok, Channel#channel{quota = Quota});
|
||||||
|
|
||||||
handle_call(Req, _From, Channel) ->
|
handle_call(Req, _From, Channel) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?SLOG(error, #{ msg => "unexpected_call"
|
||||||
|
, call => Req
|
||||||
|
}),
|
||||||
reply(ignored, Channel).
|
reply(ignored, Channel).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1225,7 +1277,9 @@ handle_info({sock_closed, Reason},
|
||||||
|
|
||||||
handle_info({sock_closed, Reason},
|
handle_info({sock_closed, Reason},
|
||||||
Channel = #channel{conn_state = disconnected}) ->
|
Channel = #channel{conn_state = disconnected}) ->
|
||||||
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
?SLOG(error, #{ msg => "unexpected_sock_closed"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(clean_authz_cache, Channel) ->
|
handle_info(clean_authz_cache, Channel) ->
|
||||||
|
@ -1233,7 +1287,9 @@ handle_info(clean_authz_cache, Channel) ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?SLOG(error, #{ msg => "unexpected_info"
|
||||||
|
, info => Info
|
||||||
|
}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1389,7 +1445,9 @@ handle_timeout(_TRef, expire_asleep, Channel) ->
|
||||||
shutdown(asleep_timeout, Channel);
|
shutdown(asleep_timeout, Channel);
|
||||||
|
|
||||||
handle_timeout(_TRef, Msg, Channel) ->
|
handle_timeout(_TRef, Msg, Channel) ->
|
||||||
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
|
?SLOG(error, #{ msg => "unexpected_timeout"
|
||||||
|
, timeout_msg => Msg
|
||||||
|
}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -22,9 +22,7 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include("src/mqttsn/include/emqx_sn.hrl").
|
-include("src/mqttsn/include/emqx_sn.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-define(LOG(Level, Format, Args),
|
|
||||||
emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)).
|
|
||||||
|
|
||||||
-export([ start_link/2
|
-export([ start_link/2
|
||||||
]).
|
]).
|
||||||
|
@ -215,15 +213,21 @@ handle_call(name, _From, State = #state{tabname = Tab}) ->
|
||||||
{reply, {Tab, self()}, State};
|
{reply, {Tab, self()}, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "Unexpected request: ~p", [Req]),
|
?SLOG(error, #{ msg => "unexpected_call"
|
||||||
|
, call => Req
|
||||||
|
}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?LOG(error, "Unexpected msg: ~p", [Msg]),
|
?SLOG(error, #{ msg => "unexpected_cast"
|
||||||
|
, cast => Msg
|
||||||
|
}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?SLOG(error, #{ msg => "unexpected_info"
|
||||||
|
, info => Info
|
||||||
|
}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
|
|
@ -280,8 +280,11 @@ auth_connect(_Packet, Channel = #channel{ctx = Ctx,
|
||||||
{ok, NClientInfo} ->
|
{ok, NClientInfo} ->
|
||||||
{ok, Channel#channel{clientinfo = NClientInfo}};
|
{ok, Channel#channel{clientinfo = NClientInfo}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
|
?SLOG(warning, #{ msg => "client_login_failed"
|
||||||
[ClientId, Username, Reason]),
|
, clientid => ClientId
|
||||||
|
, username => Username
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -315,7 +318,9 @@ process_connect(Channel = #channel{
|
||||||
{<<"heart-beat">>, reverse_heartbeats(Heartbeat)}],
|
{<<"heart-beat">>, reverse_heartbeats(Heartbeat)}],
|
||||||
handle_out(connected, Headers, Channel#channel{session = Session});
|
handle_out(connected, Headers, Channel#channel{session = Session});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to open session du to ~p", [Reason]),
|
?SLOG(error, #{ msg => "failed_to_open_session"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
Headers = [{<<"version">>, <<"1.0,1.1,1.2">>},
|
Headers = [{<<"version">>, <<"1.0,1.1,1.2">>},
|
||||||
{<<"content-type">>, <<"text/plain">>}],
|
{<<"content-type">>, <<"text/plain">>}],
|
||||||
handle_out(connerr, {Headers, undefined, <<"Not Authenticated">>}, Channel)
|
handle_out(connerr, {Headers, undefined, <<"Not Authenticated">>}, Channel)
|
||||||
|
@ -403,8 +408,10 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers),
|
||||||
handle_out(receipt, receipt_id(Headers), NChannel1)
|
handle_out(receipt, receipt_id(Headers), NChannel1)
|
||||||
end;
|
end;
|
||||||
{error, ErrMsg, NChannel} ->
|
{error, ErrMsg, NChannel} ->
|
||||||
?LOG(error, "Failed to subscribe topic ~ts, reason: ~ts",
|
?SLOG(error, #{ msg => "failed_top_subscribe_topic"
|
||||||
[Topic, ErrMsg]),
|
, topic => Topic
|
||||||
|
, reason => ErrMsg
|
||||||
|
}),
|
||||||
handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel)
|
handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -507,7 +514,9 @@ handle_in(?PACKET(?CMD_DISCONNECT, Headers), Channel) ->
|
||||||
shutdown_with_recepit(normal, receipt_id(Headers), Channel);
|
shutdown_with_recepit(normal, receipt_id(Headers), Channel);
|
||||||
|
|
||||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
|
||||||
?LOG(error, "Unexpected frame error: ~p", [Reason]),
|
?SLOG(error, #{ msg => "unexpected_frame_error"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
shutdown(Reason, Channel).
|
shutdown(Reason, Channel).
|
||||||
|
|
||||||
with_transaction(Headers, Channel = #channel{transaction = Trans}, Fun) ->
|
with_transaction(Headers, Channel = #channel{transaction = Trans}, Fun) ->
|
||||||
|
@ -653,8 +662,10 @@ handle_call({subscribe, Topic, SubOpts}, _From,
|
||||||
NChannel1 = NChannel#channel{subscriptions = NSubs},
|
NChannel1 = NChannel#channel{subscriptions = NSubs},
|
||||||
reply(ok, NChannel1);
|
reply(ok, NChannel1);
|
||||||
{error, ErrMsg, NChannel} ->
|
{error, ErrMsg, NChannel} ->
|
||||||
?LOG(error, "Failed to subscribe topic ~ts, reason: ~ts",
|
?SLOG(error, #{ msg => "failed_to_subscribe_topic"
|
||||||
[Topic, ErrMsg]),
|
, topic => Topic
|
||||||
|
, reason => ErrMsg
|
||||||
|
}),
|
||||||
reply({error, ErrMsg}, NChannel)
|
reply({error, ErrMsg}, NChannel)
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
@ -715,7 +726,9 @@ handle_call(list_authz_cache, _From, Channel) ->
|
||||||
% reply(ok, Channel#channel{quota = Quota});
|
% reply(ok, Channel#channel{quota = Quota});
|
||||||
|
|
||||||
handle_call(Req, _From, Channel) ->
|
handle_call(Req, _From, Channel) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?SLOG(error, #{ msg => "unexpected_call"
|
||||||
|
, call => Req
|
||||||
|
}),
|
||||||
reply(ignored, Channel).
|
reply(ignored, Channel).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -755,7 +768,9 @@ handle_info({sock_closed, Reason},
|
||||||
|
|
||||||
handle_info({sock_closed, Reason},
|
handle_info({sock_closed, Reason},
|
||||||
Channel = #channel{conn_state = disconnected}) ->
|
Channel = #channel{conn_state = disconnected}) ->
|
||||||
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
?SLOG(error, #{ msg => "unexpected_sock_closed"
|
||||||
|
, reason => Reason
|
||||||
|
}),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(clean_authz_cache, Channel) ->
|
handle_info(clean_authz_cache, Channel) ->
|
||||||
|
@ -763,7 +778,9 @@ handle_info(clean_authz_cache, Channel) ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?SLOG(error, #{ msg => "unexpected_info"
|
||||||
|
, info => Info
|
||||||
|
}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -828,9 +845,10 @@ handle_deliver(Delivers,
|
||||||
},
|
},
|
||||||
[Frame|Acc];
|
[Frame|Acc];
|
||||||
false ->
|
false ->
|
||||||
?LOG(error, "Dropped message ~0p due to not found "
|
?SLOG(error, #{ msg => "dropped_message_due_to_subscription_not_found"
|
||||||
"subscription id for ~ts",
|
, message => Message
|
||||||
[Message, emqx_message:topic(Message)]),
|
, topic => emqx_message:topic(Message)
|
||||||
|
}),
|
||||||
metrics_inc('delivery.dropped', Channel),
|
metrics_inc('delivery.dropped', Channel),
|
||||||
metrics_inc('delivery.dropped.no_subid', Channel),
|
metrics_inc('delivery.dropped.no_subid', Channel),
|
||||||
Acc
|
Acc
|
||||||
|
|
Loading…
Reference in New Issue