Merge pull request #10950 from HJianBo/mqttsn-qos3

fix(mqttsn): checking enable_qos3 option
This commit is contained in:
JianBo He 2023-06-09 15:46:55 +08:00 committed by GitHub
commit c450a1784a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 58 deletions

View File

@ -23,6 +23,7 @@
-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API %% API
-export([ -export([
@ -54,10 +55,8 @@
registry :: emqx_mqttsn_registry:registry(), registry :: emqx_mqttsn_registry:registry(),
%% Gateway Id %% Gateway Id
gateway_id :: integer(), gateway_id :: integer(),
%% Enable QoS3 %% Enable negative_qos
enable_negative_qos :: boolean(),
%% XXX: Get confs from ctx ?
enable_qos3 :: boolean(),
%% MQTT-SN Connection Info %% MQTT-SN Connection Info
conninfo :: emqx_types:conninfo(), conninfo :: emqx_types:conninfo(),
%% MQTT-SN Client Info %% MQTT-SN Client Info
@ -150,7 +149,7 @@ init(
Mountpoint = maps:get(mountpoint, Option, undefined), Mountpoint = maps:get(mountpoint, Option, undefined),
Registry = maps:get(registry, Option), Registry = maps:get(registry, Option),
GwId = maps:get(gateway_id, Option), GwId = maps:get(gateway_id, Option),
EnableQoS3 = maps:get(enable_qos3, Option, true), EnableNegQoS = maps:get(enable_qos3, Option, true),
ListenerId = ListenerId =
case maps:get(listener, Option, undefined) of case maps:get(listener, Option, undefined) of
undefined -> undefined; undefined -> undefined;
@ -183,7 +182,7 @@ init(
ctx = Ctx, ctx = Ctx,
registry = Registry, registry = Registry,
gateway_id = GwId, gateway_id = GwId,
enable_qos3 = EnableQoS3, enable_negative_qos = EnableNegQoS,
conninfo = ConnInfo, conninfo = ConnInfo,
clientinfo = ClientInfo, clientinfo = ClientInfo,
clientinfo_override = Override, clientinfo_override = Override,
@ -461,48 +460,61 @@ handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) ->
% ignore % ignore
shutdown(normal, Channel); shutdown(normal, Channel);
handle_in( handle_in(
?SN_PUBLISH_MSG( Publish =
#mqtt_sn_flags{ ?SN_PUBLISH_MSG(
qos = ?QOS_NEG1, #mqtt_sn_flags{
topic_id_type = TopicIdType qos = ?QOS_NEG1,
}, topic_id_type = TopicIdType
TopicId, },
_MsgId, TopicId,
Data MsgId,
), Data
),
Channel = #channel{conn_state = idle, registry = Registry} Channel = #channel{conn_state = idle, registry = Registry}
) -> ) ->
%% FIXME: check enable_qos3 ?? case check_negative_qos_enable(Publish, Channel) of
TopicName = ok ->
case (TopicIdType =:= ?SN_SHORT_TOPIC) of TopicName =
true -> case TopicIdType of
<<TopicId:16>>; ?SN_SHORT_TOPIC ->
false -> TopicId;
emqx_mqttsn_registry:lookup_topic( ?SN_PREDEFINED_TOPIC ->
Registry, emqx_mqttsn_registry:lookup_topic(
?NEG_QOS_CLIENT_ID, Registry,
TopicId ?NEG_QOS_CLIENT_ID,
) TopicId
end, );
_ = _ ->
case TopicName =/= undefined of undefined
true -> end,
Msg = emqx_message:make( case TopicName =/= undefined of
?NEG_QOS_CLIENT_ID, true ->
?QOS_0, Msg = emqx_message:make(
TopicName, ?NEG_QOS_CLIENT_ID,
Data ?QOS_0,
), TopicName,
emqx_broker:publish(Msg); Data
false -> ),
ok ?SLOG(debug, #{
end, msg => "receive_qo3_message_in_idle_mode",
?SLOG(debug, #{ topic => TopicName,
msg => "receive_qo3_message_in_idle_mode", data => Data
topic => TopicName, }),
data => Data _ = emqx_broker:publish(Msg),
}), ok;
{ok, Channel}; false ->
ok
end,
shutdown(normal, Channel);
{error, Rc} ->
?tp(info, ignore_negative_qos, #{
topic_id => TopicId,
msg_id => MsgId,
return_code => Rc
}),
PubAck = ?SN_PUBACK_MSG(TopicId, MsgId, Rc),
shutdown(normal, PubAck, Channel)
end;
handle_in( handle_in(
Pkt = #mqtt_sn_message{type = Type}, Pkt = #mqtt_sn_message{type = Type},
Channel = #channel{conn_state = idle} Channel = #channel{conn_state = idle}
@ -720,7 +732,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
case case
emqx_utils:pipeline( emqx_utils:pipeline(
[ [
fun check_qos3_enable/2, fun check_negative_qos_enable/2,
fun preproc_pub_pkt/2, fun preproc_pub_pkt/2,
fun convert_topic_id_to_name/2, fun convert_topic_id_to_name/2,
fun check_pub_authz/2, fun check_pub_authz/2,
@ -733,6 +745,11 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
{ok, Msg, NChannel} -> {ok, Msg, NChannel} ->
do_publish(TopicId, MsgId, Msg, NChannel); do_publish(TopicId, MsgId, Msg, NChannel);
{error, ReturnCode, NChannel} -> {error, ReturnCode, NChannel} ->
?tp(info, publish_msg_rejected, #{
topic_id => TopicId,
msg_id => MsgId,
return_code => ReturnCode
}),
handle_out(puback, {TopicId, MsgId, ReturnCode}, NChannel) handle_out(puback, {TopicId, MsgId, ReturnCode}, NChannel)
end; end;
handle_in( handle_in(
@ -1044,18 +1061,13 @@ send_next_register_or_replay_publish(
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle Publish %% Handle Publish
check_qos3_enable( check_negative_qos_enable(
?SN_PUBLISH_MSG(Flags, TopicId, _MsgId, Data), ?SN_PUBLISH_MSG(Flags, _TopicId, _MsgId, _Data),
#channel{enable_qos3 = EnableQoS3} #channel{enable_negative_qos = EnableNegQoS}
) -> ) ->
#mqtt_sn_flags{qos = QoS} = Flags, #mqtt_sn_flags{qos = QoS} = Flags,
case EnableQoS3 =:= false andalso QoS =:= ?QOS_NEG1 of case EnableNegQoS =:= false andalso QoS =:= ?QOS_NEG1 of
true -> true ->
?SLOG(debug, #{
msg => "ignore_msg_due_to_qos3_disabled",
topic_id => TopicId,
data => Data
}),
{error, ?SN_RC_NOT_SUPPORTED}; {error, ?SN_RC_NOT_SUPPORTED};
false -> false ->
ok ok

View File

@ -41,7 +41,6 @@ fields(mqttsn) ->
desc => ?DESC(mqttsn_broadcast) desc => ?DESC(mqttsn_broadcast)
} }
)}, )},
%% TODO: rename
{enable_qos3, {enable_qos3,
sc( sc(
boolean(), boolean(),

View File

@ -35,6 +35,8 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(HOST, {127, 0, 0, 1}). -define(HOST, {127, 0, 0, 1}).
-define(PORT, 1884). -define(PORT, 1884).
@ -120,6 +122,20 @@ restart_mqttsn_with_subs_resume_off() ->
Conf#{<<"subs_resume">> => <<"false">>} Conf#{<<"subs_resume">> => <<"false">>}
). ).
restart_mqttsn_with_neg_qos_on() ->
Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway(
mqttsn,
Conf#{<<"enable_qos3">> => <<"true">>}
).
restart_mqttsn_with_neg_qos_off() ->
Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway(
mqttsn,
Conf#{<<"enable_qos3">> => <<"false">>}
).
restart_mqttsn_with_mountpoint(Mp) -> restart_mqttsn_with_mountpoint(Mp) ->
Conf = emqx:get_raw_config([gateway, mqttsn]), Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway( emqx_gateway_conf:update_gateway(
@ -471,7 +487,7 @@ t_subscribe_case08(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).
t_publish_negqos_case09(_) -> t_publish_negqos_enabled(_) ->
Dup = 0, Dup = 0,
QoS = 0, QoS = 0,
NegQoS = 3, NegQoS = 3,
@ -510,6 +526,30 @@ t_publish_negqos_case09(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).
t_publish_negqos_disabled(_) ->
restart_mqttsn_with_neg_qos_off(),
NegQoS = 3,
MsgId = 1,
Payload = <<"abc">>,
TopicId = ?MAX_PRED_TOPIC_ID,
{ok, Socket} = gen_udp:open(0, [binary]),
?check_trace(
begin
send_publish_msg_predefined_topic(Socket, NegQoS, MsgId, TopicId, Payload),
?assertEqual(
<<7, ?SN_PUBACK, TopicId:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>,
receive_response(Socket)
),
receive_response(Socket)
end,
fun(Trace0) ->
Trace = ?of_kind(ignore_negative_qos, Trace0),
?assertMatch([#{return_code := ?SN_RC_NOT_SUPPORTED}], Trace)
end
),
restart_mqttsn_with_neg_qos_on(),
gen_udp:close(Socket).
t_publish_qos0_case01(_) -> t_publish_qos0_case01(_) ->
Dup = 0, Dup = 0,
QoS = 0, QoS = 0,

View File

@ -0,0 +1 @@
Fix the issue where the `enable_qos` option does not take effect in the MQTT-SN gateway.