diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index 7ef008954..21066655e 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -381,6 +381,8 @@ ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) -> Heartbeat = emqx_keepalive:info(interval, KeepAlive), Fun(keepalive, Heartbeat, keepalive, Channel). +check_auth_state(Msg, #channel{connection_required = false} = Channel) -> + call_session(handle_request, Msg, Channel); check_auth_state(Msg, #channel{connection_required = true} = Channel) -> case is_create_connection_request(Msg) of true -> diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index 9b6f7ce1f..999493a79 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -58,14 +58,14 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> application:load(emqx_gateway_coap), ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_mgmt_api_test_util:init_suite([emqx_authn, emqx_gateway]), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]), ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), Config. end_per_suite(_) -> meck:unload(emqx_access_control), {ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]), - emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn]). + emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]). init_per_testcase(t_connection_with_authn_failed, Config) -> ok = meck:expect( @@ -92,6 +92,13 @@ mqtt_prefix() -> ps_prefix() -> ?PS_PREFIX. +restart_coap_with_connection_mode(Bool) -> + Conf = emqx:get_raw_config([gateway, coap]), + emqx_gateway_conf:update_gateway( + coap, + Conf#{<<"connection_required">> => atom_to_binary(Bool)} + ). + %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- @@ -373,6 +380,35 @@ t_on_offline_event(_) -> end, do(Fun). +t_connectionless_pubsub(_) -> + restart_coap_with_connection_mode(false), + Fun = fun(Channel) -> + Topic = <<"t/a">>, + Payload = <<"123">>, + URI = pubsub_uri(binary_to_list(Topic)), + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), + + %% ensure subscribe succeed + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), + + %% publish a message + Req2 = make_req(post, Payload), + {ok, changed, _} = do_request(Channel, URI, Req2), + + {ok, content, Notify} = with_response(Channel), + ?LOGT("observer get Notif=~p", [Notify]), + + #coap_content{payload = PayloadRecv} = Notify, + + ?assertEqual(Payload, PayloadRecv) + end, + do(Fun), + restart_coap_with_connection_mode(true). + %%-------------------------------------------------------------------- %% helpers @@ -402,6 +438,9 @@ observe(Channel, Token, false) -> {ok, nocontent, _Data} = do_request(Channel, URI, Req), ok. +pubsub_uri(Topic) when is_list(Topic) -> + ?PS_PREFIX ++ "/" ++ Topic. + pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) -> ?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.