fix(mqttsn): checking enable_qos3 option

This commit is contained in:
JianBo He 2023-06-06 18:21:26 +08:00
parent 0a017dd549
commit 26748ef242
4 changed files with 110 additions and 59 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_mqttsn, [
{description, "MQTT-SN Gateway"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},

View File

@ -23,6 +23,7 @@
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
@ -54,10 +55,8 @@
registry :: emqx_mqttsn_registry:registry(),
%% Gateway Id
gateway_id :: integer(),
%% Enable QoS3
%% XXX: Get confs from ctx ?
enable_qos3 :: boolean(),
%% Enable negative_qos
enable_negative_qos :: boolean(),
%% MQTT-SN Connection Info
conninfo :: emqx_types:conninfo(),
%% MQTT-SN Client Info
@ -150,7 +149,7 @@ init(
Mountpoint = maps:get(mountpoint, Option, undefined),
Registry = maps:get(registry, Option),
GwId = maps:get(gateway_id, Option),
EnableQoS3 = maps:get(enable_qos3, Option, true),
EnableNegQoS = maps:get(enable_qos3, Option, true),
ListenerId =
case maps:get(listener, Option, undefined) of
undefined -> undefined;
@ -183,7 +182,7 @@ init(
ctx = Ctx,
registry = Registry,
gateway_id = GwId,
enable_qos3 = EnableQoS3,
enable_negative_qos = EnableNegQoS,
conninfo = ConnInfo,
clientinfo = ClientInfo,
clientinfo_override = Override,
@ -461,30 +460,33 @@ handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) ->
% ignore
shutdown(normal, Channel);
handle_in(
Publish =
?SN_PUBLISH_MSG(
#mqtt_sn_flags{
qos = ?QOS_NEG1,
topic_id_type = TopicIdType
},
TopicId,
_MsgId,
MsgId,
Data
),
Channel = #channel{conn_state = idle, registry = Registry}
) ->
%% FIXME: check enable_qos3 ??
case check_negative_qos_enable(Publish, Channel) of
ok ->
TopicName =
case (TopicIdType =:= ?SN_SHORT_TOPIC) of
true ->
<<TopicId:16>>;
false ->
case TopicIdType of
?SN_SHORT_TOPIC ->
TopicId;
?SN_PREDEFINED_TOPIC ->
emqx_mqttsn_registry:lookup_topic(
Registry,
?NEG_QOS_CLIENT_ID,
TopicId
)
);
_ ->
undefined
end,
_ =
case TopicName =/= undefined of
true ->
Msg = emqx_message:make(
@ -493,16 +495,26 @@ handle_in(
TopicName,
Data
),
emqx_broker:publish(Msg);
false ->
ok
end,
?SLOG(debug, #{
msg => "receive_qo3_message_in_idle_mode",
topic => TopicName,
data => Data
}),
{ok, Channel};
emqx_broker:publish(Msg),
ok;
false ->
ok
end,
shutdown(normal, Channel);
{error, Rc} ->
?tp(info, ignore_negative_qos, #{
topic_id => TopicId,
msg_id => MsgId,
return_code => Rc
}),
PubAck = ?SN_PUBACK_MSG(TopicId, MsgId, Rc),
shutdown(normal, PubAck, Channel)
end;
handle_in(
Pkt = #mqtt_sn_message{type = Type},
Channel = #channel{conn_state = idle}
@ -720,7 +732,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
case
emqx_utils:pipeline(
[
fun check_qos3_enable/2,
fun check_negative_qos_enable/2,
fun preproc_pub_pkt/2,
fun convert_topic_id_to_name/2,
fun check_pub_authz/2,
@ -733,6 +745,11 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
{ok, Msg, NChannel} ->
do_publish(TopicId, MsgId, Msg, NChannel);
{error, ReturnCode, NChannel} ->
?tp(info, publish_msg_rejected, #{
topic_id => TopicId,
msg_id => MsgId,
return_code => ReturnCode
}),
handle_out(puback, {TopicId, MsgId, ReturnCode}, NChannel)
end;
handle_in(
@ -1044,18 +1061,13 @@ send_next_register_or_replay_publish(
%%--------------------------------------------------------------------
%% Handle Publish
check_qos3_enable(
?SN_PUBLISH_MSG(Flags, TopicId, _MsgId, Data),
#channel{enable_qos3 = EnableQoS3}
check_negative_qos_enable(
?SN_PUBLISH_MSG(Flags, _TopicId, _MsgId, _Data),
#channel{enable_negative_qos = EnableNegQoS}
) ->
#mqtt_sn_flags{qos = QoS} = Flags,
case EnableQoS3 =:= false andalso QoS =:= ?QOS_NEG1 of
case EnableNegQoS =:= false andalso QoS =:= ?QOS_NEG1 of
true ->
?SLOG(debug, #{
msg => "ignore_msg_due_to_qos3_disabled",
topic_id => TopicId,
data => Data
}),
{error, ?SN_RC_NOT_SUPPORTED};
false ->
ok

View File

@ -41,7 +41,6 @@ fields(mqttsn) ->
desc => ?DESC(mqttsn_broadcast)
}
)},
%% TODO: rename
{enable_qos3,
sc(
boolean(),

View File

@ -35,6 +35,8 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(HOST, {127, 0, 0, 1}).
-define(PORT, 1884).
@ -120,6 +122,20 @@ restart_mqttsn_with_subs_resume_off() ->
Conf#{<<"subs_resume">> => <<"false">>}
).
restart_mqttsn_with_neg_qos_on() ->
Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway(
mqttsn,
Conf#{<<"enable_qos3">> => <<"true">>}
).
restart_mqttsn_with_neg_qos_off() ->
Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway(
mqttsn,
Conf#{<<"enable_qos3">> => <<"false">>}
).
default_config() ->
?CONF_DEFAULT.
@ -464,7 +480,7 @@ t_subscribe_case08(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_publish_negqos_case09(_) ->
t_publish_negqos_enabled(_) ->
Dup = 0,
QoS = 0,
NegQoS = 3,
@ -503,6 +519,30 @@ t_publish_negqos_case09(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_publish_negqos_disabled(_) ->
restart_mqttsn_with_neg_qos_off(),
NegQoS = 3,
MsgId = 1,
Payload = <<"abc">>,
TopicId = ?MAX_PRED_TOPIC_ID,
{ok, Socket} = gen_udp:open(0, [binary]),
?check_trace(
begin
send_publish_msg_predefined_topic(Socket, NegQoS, MsgId, TopicId, Payload),
?assertEqual(
<<7, ?SN_PUBACK, TopicId:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>,
receive_response(Socket)
),
receive_response(Socket)
end,
fun(Trace0) ->
Trace = ?of_kind(ignore_negative_qos, Trace0),
?assertMatch([#{return_code := ?SN_RC_NOT_SUPPORTED}], Trace)
end
),
restart_mqttsn_with_neg_qos_on(),
gen_udp:close(Socket).
t_publish_qos0_case01(_) ->
Dup = 0,
QoS = 0,