From 55c1a1868a61a60a67949ba3559efc1f035650e0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 8 Aug 2022 10:47:16 +0800 Subject: [PATCH 1/5] fix(coap): remove the leading `/` in assembling publish topic --- .../src/coap/emqx_coap_channel.erl | 7 ++- .../coap/handler/emqx_coap_pubsub_handler.erl | 55 +++++++++---------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index c40cbe467..df5432fc3 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -153,7 +153,7 @@ init( mountpoint => Mountpoint } ), - + %% FIXME: it should coap.hearbeat instead of idle_timeout? Heartbeat = ?GET_IDLE_TIME(Config), #channel{ ctx = Ctx, @@ -447,6 +447,7 @@ enrich_conninfo( conninfo = ConnInfo } ) -> + %% FIXME: generate a random clientid if absent case Queries of #{<<"clientid">> := ClientId} -> Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), @@ -467,6 +468,9 @@ enrich_clientinfo( {Queries, Msg}, Channel = #channel{clientinfo = ClientInfo0} ) -> + %% FIXME: + %% 1. generate a random clientid if absent; + %% 2. assgin username, password to `undefined` if absent case Queries of #{ <<"username">> := UserName, @@ -542,6 +546,7 @@ process_connect( ) of {ok, _Sess} -> + %% FIXME: Token in cluster wide? RandVal = rand:uniform(?TOKEN_MAXIMUM), Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)), NResult = Result#{events => [{event, connected}]}, diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index 49df1db23..2e962a0bc 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -69,17 +69,7 @@ handle_method(_, _, Msg, _, _) -> check_topic([]) -> error; check_topic(Path) -> - Sep = <<"/">>, - {ok, - emqx_http_lib:uri_decode( - lists:foldl( - fun(Part, Acc) -> - <> - end, - <<>>, - Path - ) - )}. + {ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}. get_sub_opts(#coap_message{options = Opts} = Msg) -> SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts), @@ -124,25 +114,30 @@ get_publish_qos(Msg) -> end. apply_publish_opts(Msg, MQTTMsg) -> - maps:fold( - fun - (<<"retain">>, V, Acc) -> - Val = erlang:binary_to_atom(V), - emqx_message:set_flag(retain, Val, Acc); - (<<"expiry">>, V, Acc) -> - Val = erlang:binary_to_integer(V), - Props = emqx_message:get_header(properties, Acc), - emqx_message:set_header( - properties, - Props#{'Message-Expiry-Interval' => Val}, - Acc - ); - (_, _, Acc) -> - Acc - end, - MQTTMsg, - emqx_coap_message:get_option(uri_query, Msg) - ). + case emqx_coap_message:get_option(uri_query, Msg) of + undefined -> + MQTTMsg; + Qs -> + maps:fold( + fun + (<<"retain">>, V, Acc) -> + Val = erlang:binary_to_atom(V), + emqx_message:set_flag(retain, Val, Acc); + (<<"expiry">>, V, Acc) -> + Val = erlang:binary_to_integer(V), + Props = emqx_message:get_header(properties, Acc), + emqx_message:set_header( + properties, + Props#{'Message-Expiry-Interval' => Val}, + Acc + ); + (_, _, Acc) -> + Acc + end, + MQTTMsg, + Qs + ) + end. subscribe(#coap_message{token = <<>>} = Msg, _, _, _) -> reply({error, bad_request}, <<"observe without token">>, Msg); From d6b222d1ff28de21e5b974bdaf8e5411a24555aa Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Aug 2022 18:14:38 +0800 Subject: [PATCH 2/5] test: fix coap authz suite failures --- apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl index 6bbd2135b..171a0bde4 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl @@ -98,7 +98,7 @@ t_case_coap_publish(_) -> Prefix = Mod:ps_prefix(), Fun = fun(Channel, Token, Topic, Checker) -> TopicStr = binary_to_list(Topic), - URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token, Req = Mod:make_req(post, <<>>), Checker(Mod:do_request(Channel, URI, Req)) @@ -114,7 +114,7 @@ t_case_coap_subscribe(_) -> Prefix = Mod:ps_prefix(), Fun = fun(Channel, Token, Topic, Checker) -> TopicStr = binary_to_list(Topic), - URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token, Req = Mod:make_req(get, <<>>, [{observe, 0}]), Checker(Mod:do_request(Channel, URI, Req)) From da4efc11c2f168898424f132ac1827a3a0389f33 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 6 Sep 2022 15:20:05 +0800 Subject: [PATCH 3/5] chore: log authn-http parsing failed reason --- apps/emqx_authn/src/simple_authn/emqx_authn_http.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 2304cf1e4..8489debcd 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -361,7 +361,12 @@ handle_response(Headers, Body) -> _ -> ignore end; - {error, _Reason} -> + {error, Reason} -> + ?TRACE_AUTHN_PROVIDER( + error, + "parse_http_response_failed", + #{content_type => ContentType, body => Body, reason => Reason} + ), ignore end. From 0caaccaa0fb2c83cf266b9d43d36a3962b55a1ab Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 6 Sep 2022 15:20:50 +0800 Subject: [PATCH 4/5] test: add tests for `/` leading topic --- apps/emqx_gateway/test/emqx_coap_SUITE.erl | 80 +++++++++++++++------- 1 file changed, 54 insertions(+), 26 deletions(-) diff --git a/apps/emqx_gateway/test/emqx_coap_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_SUITE.erl index e672e2d59..f6b32c68c 100644 --- a/apps/emqx_gateway/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_coap_SUITE.erl @@ -143,12 +143,15 @@ t_connection_with_authn_failed(_) -> ok. t_publish(_) -> - Action = fun(Channel, Token) -> - Topic = <<"/abc">>, + %% can publish to a normal topic + Topics = [ + <<"abc">>, + %% can publish to a `/` leading topic + <<"/abc">> + ], + Action = fun(Topic, Channel, Token) -> Payload = <<"123">>, - - TopicStr = binary_to_list(Topic), - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = pubsub_uri(binary_to_list(Topic), Token), %% Sub topic first emqx:subscribe(Topic), @@ -164,24 +167,28 @@ t_publish(_) -> ?assert(false) end end, - with_connection(Action). + with_connection(Topics, Action). t_subscribe(_) -> - Topic = <<"/abc">>, - Fun = fun(Channel, Token) -> - TopicStr = binary_to_list(Topic), + %% can subscribe to a normal topic + Topics = [ + <<"abc">>, + %% can subscribe to a `/` leading topic + <<"/abc">> + ], + Fun = fun(Topic, Channel, Token) -> Payload = <<"123">>, - - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = pubsub_uri(binary_to_list(Topic), Token), Req = make_req(get, Payload, [{observe, 0}]), {ok, content, _} = do_request(Channel, URI, Req), ?LOGT("observer topic:~ts~n", [Topic]), + %% ensure subscribe succeed timer:sleep(100), [SubPid] = emqx:subscribers(Topic), ?assert(is_pid(SubPid)), - %% Publish a message + %% publish a message emqx:publish(emqx_message:make(Topic, Payload)), {ok, content, Notify} = with_response(Channel), ?LOGT("observer get Notif=~p", [Notify]), @@ -191,18 +198,27 @@ t_subscribe(_) -> ?assertEqual(Payload, PayloadRecv) end, - with_connection(Fun), - timer:sleep(100), + with_connection(Topics, Fun), - ?assertEqual([], emqx:subscribers(Topic)). + %% subscription removed if coap client disconnected + timer:sleep(100), + lists:foreach( + fun(Topic) -> + ?assertEqual([], emqx:subscribers(Topic)) + end, + Topics + ). t_un_subscribe(_) -> - Topic = <<"/abc">>, - Fun = fun(Channel, Token) -> - TopicStr = binary_to_list(Topic), + %% can unsubscribe to a normal topic + Topics = [ + <<"abc">>, + %% can unsubscribe to a `/` leading topic + <<"/abc">> + ], + Fun = fun(Topic, Channel, Token) -> Payload = <<"123">>, - - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = pubsub_uri(binary_to_list(Topic), Token), Req = make_req(get, Payload, [{observe, 0}]), {ok, content, _} = do_request(Channel, URI, Req), @@ -219,16 +235,15 @@ t_un_subscribe(_) -> ?assertEqual([], emqx:subscribers(Topic)) end, - with_connection(Fun). + with_connection(Topics, Fun). t_observe_wildcard(_) -> Fun = fun(Channel, Token) -> %% resolve_url can't process wildcard with # - Topic = <<"/abc/+">>, - TopicStr = binary_to_list(Topic), + Topic = <<"abc/+">>, Payload = <<"123">>, - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = pubsub_uri(binary_to_list(Topic), Token), Req = make_req(get, Payload, [{observe, 0}]), {ok, content, _} = do_request(Channel, URI, Req), ?LOGT("observer topic:~ts~n", [Topic]), @@ -238,7 +253,7 @@ t_observe_wildcard(_) -> ?assert(is_pid(SubPid)), %% Publish a message - PubTopic = <<"/abc/def">>, + PubTopic = <<"abc/def">>, emqx:publish(emqx_message:make(PubTopic, Payload)), {ok, content, Notify} = with_response(Channel), @@ -320,7 +335,7 @@ t_clients_get_subscription_api(_) -> {200, [Subs]} = request(get, Path), - ?assertEqual(<<"/coap/observe">>, maps:get(topic, Subs)), + ?assertEqual(<<"coap/observe">>, maps:get(topic, Subs)), observe(Channel, Token, false), @@ -386,6 +401,9 @@ observe(Channel, Token, false) -> {ok, nocontent, _Data} = do_request(Channel, URI, Req), ok. +pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) -> + ?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token. + make_req(Method) -> make_req(Method, <<>>). @@ -442,6 +460,16 @@ with_connection(Action) -> end, do(Fun). +with_connection(Checks, Action) -> + Fun = fun(Channel) -> + Token = connection(Channel), + timer:sleep(100), + lists:foreach(fun(E) -> Action(E, Channel, Token) end, Checks), + disconnection(Channel, Token), + timer:sleep(100) + end, + do(Fun). + receive_deliver(Wait) -> receive {deliver, _, Msg} -> From 240e79a463084c4f5735db505d0a7feb3cc63deb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 7 Sep 2022 09:32:03 +0800 Subject: [PATCH 5/5] chore: update changes --- CHANGES-5.0.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index 82d907458..2a2612b22 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -6,6 +6,7 @@ * Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867) * Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887) * Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893) +* Fix the extra / prefix when CoAP gateway parsing client topics. [#8658](https://github.com/emqx/emqx/pull/8658) ## Enhancements