fix(coap): fix crash publish/subscribe in connectionless mode

this bug was introduced by https://github.com/emqx/emqx/pull/10871
This commit is contained in:
JianBo He 2023-06-08 18:08:11 +08:00
parent 641166a67a
commit 95f0745e31
2 changed files with 43 additions and 2 deletions

View File

@ -381,6 +381,8 @@ ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
Heartbeat = emqx_keepalive:info(interval, KeepAlive),
Fun(keepalive, Heartbeat, keepalive, Channel).
check_auth_state(Msg, #channel{connection_required = false} = Channel) ->
call_session(handle_request, Msg, Channel);
check_auth_state(Msg, #channel{connection_required = true} = Channel) ->
case is_create_connection_request(Msg) of
true ->

View File

@ -58,14 +58,14 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_gateway_coap),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_authn, emqx_gateway]),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
Config.
end_per_suite(_) ->
meck:unload(emqx_access_control),
{ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn]).
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]).
init_per_testcase(t_connection_with_authn_failed, Config) ->
ok = meck:expect(
@ -92,6 +92,13 @@ mqtt_prefix() ->
ps_prefix() ->
?PS_PREFIX.
restart_coap_with_connection_mode(Bool) ->
Conf = emqx:get_raw_config([gateway, coap]),
emqx_gateway_conf:update_gateway(
coap,
Conf#{<<"connection_required">> => atom_to_binary(Bool)}
).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
@ -373,6 +380,35 @@ t_on_offline_event(_) ->
end,
do(Fun).
t_connectionless_pubsub(_) ->
restart_coap_with_connection_mode(false),
Fun = fun(Channel) ->
Topic = <<"t/a">>,
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic)),
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
%% ensure subscribe succeed
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% publish a message
Req2 = make_req(post, Payload),
{ok, changed, _} = do_request(Channel, URI, Req2),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
do(Fun),
restart_coap_with_connection_mode(true).
%%--------------------------------------------------------------------
%% helpers
@ -402,6 +438,9 @@ observe(Channel, Token, false) ->
{ok, nocontent, _Data} = do_request(Channel, URI, Req),
ok.
pubsub_uri(Topic) when is_list(Topic) ->
?PS_PREFIX ++ "/" ++ Topic.
pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.