fix(coap): to better handle coap requests in connection mode

Fixes for connection deletion and message publishing requests not taking effect
issues once the connection has been created in a different UDP port
first.
This commit is contained in:
JianBo He 2023-05-29 18:12:38 +08:00
parent 0f080cda66
commit a132df5568
4 changed files with 89 additions and 37 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.18"},
{vsn, "0.1.19"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},

View File

@ -33,6 +33,7 @@
-export([
open_session/5,
open_session/6,
discard_session/2,
kick_session/2,
kick_session/3,
takeover_session/2,

View File

@ -304,6 +304,8 @@ handle_call(
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
Subs = emqx_coap_session:info(subscriptions, Session),
{reply, {ok, maps:to_list(Subs)}, Channel};
handle_call({check_token, ReqToken}, _From, Channel = #channel{token = Token}) ->
{reply, ReqToken == Token, Channel};
handle_call(kick, _From, Channel) ->
NChannel = ensure_disconnected(kicked, Channel),
shutdown_and_reply(kicked, ok, NChannel);
@ -319,6 +321,9 @@ handle_call(Req, _From, Channel) ->
-spec handle_cast(Req :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_cast(close, Channel) ->
?SLOG(info, #{msg => "close_connection"}),
shutdown(normal, Channel);
handle_cast(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
{ok, Channel}.
@ -376,18 +381,54 @@ ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
Heartbeat = emqx_keepalive:info(interval, KeepAlive),
Fun(keepalive, Heartbeat, keepalive, Channel).
check_auth_state(Msg, #channel{connection_required = Required} = Channel) ->
check_token(Required, Msg, Channel).
check_auth_state(Msg, #channel{connection_required = true} = Channel) ->
case is_create_connection_request(Msg) of
true ->
call_session(handle_request, Msg, Channel);
false ->
URIQuery = emqx_coap_message:get_option(uri_query, Msg, #{}),
case maps:get(<<"token">>, URIQuery, undefined) of
undefined ->
?SLOG(debug, #{msg => "token_required_in_conn_mode", message => Msg});
_ ->
check_token(Msg, Channel)
end
end.
is_create_connection_request(Msg = #coap_message{method = Method}) when
is_atom(Method) andalso Method =/= undefined
->
URIPath = emqx_coap_message:get_option(uri_path, Msg, []),
case URIPath of
[<<"mqtt">>, <<"connection">>] when Method == post ->
true;
_ ->
false
end;
is_create_connection_request(_Msg) ->
false.
is_delete_connection_request(Msg = #coap_message{method = Method}) when
is_atom(Method) andalso Method =/= undefined
->
URIPath = emqx_coap_message:get_option(uri_path, Msg, []),
case URIPath of
[<<"mqtt">>, <<"connection">>] when Method == delete ->
true;
_ ->
false
end;
is_delete_connection_request(_Msg) ->
false.
check_token(
true,
Msg,
#channel{
token = Token,
clientinfo = ClientInfo,
conn_state = CState
clientinfo = ClientInfo
} = Channel
) ->
IsDeleteConn = is_delete_connection_request(Msg),
#{clientid := ClientId} = ClientInfo,
case emqx_coap_message:get_option(uri_query, Msg) of
#{
@ -395,37 +436,33 @@ check_token(
<<"token">> := Token
} ->
call_session(handle_request, Msg, Channel);
#{<<"clientid">> := DesireId} ->
try_takeover(CState, DesireId, Msg, Channel);
_ ->
Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
{ok, {outgoing, Reply}, Channel}
end;
check_token(false, Msg, Channel) ->
call_session(handle_request, Msg, Channel).
try_takeover(idle, DesireId, Msg, Channel) ->
case emqx_coap_message:get_option(uri_path, Msg, []) of
[<<"mqtt">>, <<"connection">> | _] ->
%% may be is a connect request
%% TODO need check repeat connect, unless we implement the
%% udp connection baseon the clientid
call_session(handle_request, Msg, Channel);
_ ->
case emqx_conf:get([gateway, coap, ?AUTHN], undefined) of
#{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} ->
case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of
undefined when IsDeleteConn ->
Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
{shutdown, normal, Reply, Channel};
undefined ->
call_session(handle_request, Msg, Channel);
_ ->
do_takeover(DesireId, Msg, Channel)
end
end;
try_takeover(_, DesireId, Msg, Channel) ->
do_takeover(DesireId, Msg, Channel).
do_takeover(_DesireId, Msg, Channel) ->
%% TODO completed the takeover, now only reset the message
Reset = emqx_coap_message:reset(Msg),
{ok, {outgoing, Reset}, Channel}.
?SLOG(info, #{
msg => "remote_connection_not_found",
clientid => ReqClientId,
token => ReqToken
}),
Reply = emqx_coap_message:reset(Msg),
{shutdown, normal, Reply, Channel};
false ->
?SLOG(info, #{
msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken
}),
Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
{shutdown, normal, Reply, Channel};
true ->
call_session(handle_request, Msg, Channel)
end;
_ ->
ErrMsg = <<"Missing token or clientid in connection mode">>,
Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
{shutdown, normal, Reply, Channel}
end.
run_conn_hooks(
Input,
@ -612,6 +649,9 @@ ensure_disconnected(
end,
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
shutdown(Reason, Channel) ->
{shutdown, Reason, Channel}.
shutdown_and_reply(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
@ -759,6 +799,17 @@ process_connection(
Channel
);
process_connection({close, Msg}, _, Channel, _) ->
Queries = emqx_coap_message:get_option(uri_query, Msg),
case maps:get(<<"clientid">>, Queries, undefined) of
undefined ->
ok;
ClientId ->
%% XXX: A cluster-level connection shutdown needs to be performed here.
%%
%% due to the possibility that the current close request may be
%% from a CoAP client from another IP + Port tuple
emqx_gateway_cm:cast(coap, ClientId, close)
end,
Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
NChannel = ensure_disconnected(normal, Channel),
{shutdown, normal, Reply, NChannel}.

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_coap, [
{description, "CoAP Gateway"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},