diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 487e71a06..ca99dc615 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.18"}, + {vsn, "0.1.19"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]}, diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 837600811..814e37163 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -33,6 +33,7 @@ -export([ open_session/5, open_session/6, + discard_session/2, kick_session/2, kick_session/3, takeover_session/2, diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index b90fd630d..7ef008954 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -304,6 +304,8 @@ handle_call( handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> Subs = emqx_coap_session:info(subscriptions, Session), {reply, {ok, maps:to_list(Subs)}, Channel}; +handle_call({check_token, ReqToken}, _From, Channel = #channel{token = Token}) -> + {reply, ReqToken == Token, Channel}; handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), shutdown_and_reply(kicked, ok, NChannel); @@ -319,6 +321,9 @@ handle_call(Req, _From, Channel) -> -spec handle_cast(Req :: term(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. +handle_cast(close, Channel) -> + ?SLOG(info, #{msg => "close_connection"}), + shutdown(normal, Channel); handle_cast(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {ok, Channel}. @@ -376,18 +381,54 @@ ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) -> Heartbeat = emqx_keepalive:info(interval, KeepAlive), Fun(keepalive, Heartbeat, keepalive, Channel). -check_auth_state(Msg, #channel{connection_required = Required} = Channel) -> - check_token(Required, Msg, Channel). +check_auth_state(Msg, #channel{connection_required = true} = Channel) -> + case is_create_connection_request(Msg) of + true -> + call_session(handle_request, Msg, Channel); + false -> + URIQuery = emqx_coap_message:get_option(uri_query, Msg, #{}), + case maps:get(<<"token">>, URIQuery, undefined) of + undefined -> + ?SLOG(debug, #{msg => "token_required_in_conn_mode", message => Msg}); + _ -> + check_token(Msg, Channel) + end + end. + +is_create_connection_request(Msg = #coap_message{method = Method}) when + is_atom(Method) andalso Method =/= undefined +-> + URIPath = emqx_coap_message:get_option(uri_path, Msg, []), + case URIPath of + [<<"mqtt">>, <<"connection">>] when Method == post -> + true; + _ -> + false + end; +is_create_connection_request(_Msg) -> + false. + +is_delete_connection_request(Msg = #coap_message{method = Method}) when + is_atom(Method) andalso Method =/= undefined +-> + URIPath = emqx_coap_message:get_option(uri_path, Msg, []), + case URIPath of + [<<"mqtt">>, <<"connection">>] when Method == delete -> + true; + _ -> + false + end; +is_delete_connection_request(_Msg) -> + false. check_token( - true, Msg, #channel{ token = Token, - clientinfo = ClientInfo, - conn_state = CState + clientinfo = ClientInfo } = Channel ) -> + IsDeleteConn = is_delete_connection_request(Msg), #{clientid := ClientId} = ClientInfo, case emqx_coap_message:get_option(uri_query, Msg) of #{ @@ -395,37 +436,33 @@ check_token( <<"token">> := Token } -> call_session(handle_request, Msg, Channel); - #{<<"clientid">> := DesireId} -> - try_takeover(CState, DesireId, Msg, Channel); - _ -> - Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), - {ok, {outgoing, Reply}, Channel} - end; -check_token(false, Msg, Channel) -> - call_session(handle_request, Msg, Channel). - -try_takeover(idle, DesireId, Msg, Channel) -> - case emqx_coap_message:get_option(uri_path, Msg, []) of - [<<"mqtt">>, <<"connection">> | _] -> - %% may be is a connect request - %% TODO need check repeat connect, unless we implement the - %% udp connection baseon the clientid - call_session(handle_request, Msg, Channel); - _ -> - case emqx_conf:get([gateway, coap, ?AUTHN], undefined) of + #{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} -> + case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of + undefined when IsDeleteConn -> + Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), + {shutdown, normal, Reply, Channel}; undefined -> - call_session(handle_request, Msg, Channel); - _ -> - do_takeover(DesireId, Msg, Channel) - end - end; -try_takeover(_, DesireId, Msg, Channel) -> - do_takeover(DesireId, Msg, Channel). - -do_takeover(_DesireId, Msg, Channel) -> - %% TODO completed the takeover, now only reset the message - Reset = emqx_coap_message:reset(Msg), - {ok, {outgoing, Reset}, Channel}. + ?SLOG(info, #{ + msg => "remote_connection_not_found", + clientid => ReqClientId, + token => ReqToken + }), + Reply = emqx_coap_message:reset(Msg), + {shutdown, normal, Reply, Channel}; + false -> + ?SLOG(info, #{ + msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken + }), + Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), + {shutdown, normal, Reply, Channel}; + true -> + call_session(handle_request, Msg, Channel) + end; + _ -> + ErrMsg = <<"Missing token or clientid in connection mode">>, + Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg), + {shutdown, normal, Reply, Channel} + end. run_conn_hooks( Input, @@ -612,6 +649,9 @@ ensure_disconnected( end, Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. +shutdown(Reason, Channel) -> + {shutdown, Reason, Channel}. + shutdown_and_reply(Reason, Reply, Channel) -> {shutdown, Reason, Reply, Channel}. @@ -759,6 +799,17 @@ process_connection( Channel ); process_connection({close, Msg}, _, Channel, _) -> + Queries = emqx_coap_message:get_option(uri_query, Msg), + case maps:get(<<"clientid">>, Queries, undefined) of + undefined -> + ok; + ClientId -> + %% XXX: A cluster-level connection shutdown needs to be performed here. + %% + %% due to the possibility that the current close request may be + %% from a CoAP client from another IP + Port tuple + emqx_gateway_cm:cast(coap, ClientId, close) + end, Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), NChannel = ensure_disconnected(normal, Channel), {shutdown, normal, Reply, NChannel}. diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index decd13bef..e03066695 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/changes/ce/fix-10871.en.md b/changes/ce/fix-10871.en.md new file mode 100644 index 000000000..9091eccbd --- /dev/null +++ b/changes/ce/fix-10871.en.md @@ -0,0 +1,2 @@ +Fixes for connection deletion and message publishing requests not taking effect +issues once the connection has been created in a different UDP port first.