diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 76f0f45b5..b43201e1a 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 914f837e1..6f986d603 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API -export([ @@ -54,10 +55,8 @@ registry :: emqx_mqttsn_registry:registry(), %% Gateway Id gateway_id :: integer(), - %% Enable QoS3 - - %% XXX: Get confs from ctx ? - enable_qos3 :: boolean(), + %% Enable negative_qos + enable_negative_qos :: boolean(), %% MQTT-SN Connection Info conninfo :: emqx_types:conninfo(), %% MQTT-SN Client Info @@ -150,7 +149,7 @@ init( Mountpoint = maps:get(mountpoint, Option, undefined), Registry = maps:get(registry, Option), GwId = maps:get(gateway_id, Option), - EnableQoS3 = maps:get(enable_qos3, Option, true), + EnableNegQoS = maps:get(enable_qos3, Option, true), ListenerId = case maps:get(listener, Option, undefined) of undefined -> undefined; @@ -183,7 +182,7 @@ init( ctx = Ctx, registry = Registry, gateway_id = GwId, - enable_qos3 = EnableQoS3, + enable_negative_qos = EnableNegQoS, conninfo = ConnInfo, clientinfo = ClientInfo, clientinfo_override = Override, @@ -461,48 +460,61 @@ handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) -> % ignore shutdown(normal, Channel); handle_in( - ?SN_PUBLISH_MSG( - #mqtt_sn_flags{ - qos = ?QOS_NEG1, - topic_id_type = TopicIdType - }, - TopicId, - _MsgId, - Data - ), + Publish = + ?SN_PUBLISH_MSG( + #mqtt_sn_flags{ + qos = ?QOS_NEG1, + topic_id_type = TopicIdType + }, + TopicId, + MsgId, + Data + ), Channel = #channel{conn_state = idle, registry = Registry} ) -> - %% FIXME: check enable_qos3 ?? - TopicName = - case (TopicIdType =:= ?SN_SHORT_TOPIC) of - true -> - <>; - false -> - emqx_mqttsn_registry:lookup_topic( - Registry, - ?NEG_QOS_CLIENT_ID, - TopicId - ) - end, - _ = - case TopicName =/= undefined of - true -> - Msg = emqx_message:make( - ?NEG_QOS_CLIENT_ID, - ?QOS_0, - TopicName, - Data - ), - emqx_broker:publish(Msg); - false -> - ok - end, - ?SLOG(debug, #{ - msg => "receive_qo3_message_in_idle_mode", - topic => TopicName, - data => Data - }), - {ok, Channel}; + case check_negative_qos_enable(Publish, Channel) of + ok -> + TopicName = + case TopicIdType of + ?SN_SHORT_TOPIC -> + TopicId; + ?SN_PREDEFINED_TOPIC -> + emqx_mqttsn_registry:lookup_topic( + Registry, + ?NEG_QOS_CLIENT_ID, + TopicId + ); + _ -> + undefined + end, + case TopicName =/= undefined of + true -> + Msg = emqx_message:make( + ?NEG_QOS_CLIENT_ID, + ?QOS_0, + TopicName, + Data + ), + ?SLOG(debug, #{ + msg => "receive_qo3_message_in_idle_mode", + topic => TopicName, + data => Data + }), + emqx_broker:publish(Msg), + ok; + false -> + ok + end, + shutdown(normal, Channel); + {error, Rc} -> + ?tp(info, ignore_negative_qos, #{ + topic_id => TopicId, + msg_id => MsgId, + return_code => Rc + }), + PubAck = ?SN_PUBACK_MSG(TopicId, MsgId, Rc), + shutdown(normal, PubAck, Channel) + end; handle_in( Pkt = #mqtt_sn_message{type = Type}, Channel = #channel{conn_state = idle} @@ -720,7 +732,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) -> case emqx_utils:pipeline( [ - fun check_qos3_enable/2, + fun check_negative_qos_enable/2, fun preproc_pub_pkt/2, fun convert_topic_id_to_name/2, fun check_pub_authz/2, @@ -733,6 +745,11 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) -> {ok, Msg, NChannel} -> do_publish(TopicId, MsgId, Msg, NChannel); {error, ReturnCode, NChannel} -> + ?tp(info, publish_msg_rejected, #{ + topic_id => TopicId, + msg_id => MsgId, + return_code => ReturnCode + }), handle_out(puback, {TopicId, MsgId, ReturnCode}, NChannel) end; handle_in( @@ -1044,18 +1061,13 @@ send_next_register_or_replay_publish( %%-------------------------------------------------------------------- %% Handle Publish -check_qos3_enable( - ?SN_PUBLISH_MSG(Flags, TopicId, _MsgId, Data), - #channel{enable_qos3 = EnableQoS3} +check_negative_qos_enable( + ?SN_PUBLISH_MSG(Flags, _TopicId, _MsgId, _Data), + #channel{enable_negative_qos = EnableNegQoS} ) -> #mqtt_sn_flags{qos = QoS} = Flags, - case EnableQoS3 =:= false andalso QoS =:= ?QOS_NEG1 of + case EnableNegQoS =:= false andalso QoS =:= ?QOS_NEG1 of true -> - ?SLOG(debug, #{ - msg => "ignore_msg_due_to_qos3_disabled", - topic_id => TopicId, - data => Data - }), {error, ?SN_RC_NOT_SUPPORTED}; false -> ok diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl index cb33cbe95..8adf7a934 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl @@ -41,7 +41,6 @@ fields(mqttsn) -> desc => ?DESC(mqttsn_broadcast) } )}, - %% TODO: rename {enable_qos3, sc( boolean(), diff --git a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl index cce4ce904..5e008e63b 100644 --- a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl @@ -35,6 +35,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -define(HOST, {127, 0, 0, 1}). -define(PORT, 1884). @@ -120,6 +122,20 @@ restart_mqttsn_with_subs_resume_off() -> Conf#{<<"subs_resume">> => <<"false">>} ). +restart_mqttsn_with_neg_qos_on() -> + Conf = emqx:get_raw_config([gateway, mqttsn]), + emqx_gateway_conf:update_gateway( + mqttsn, + Conf#{<<"enable_qos3">> => <<"true">>} + ). + +restart_mqttsn_with_neg_qos_off() -> + Conf = emqx:get_raw_config([gateway, mqttsn]), + emqx_gateway_conf:update_gateway( + mqttsn, + Conf#{<<"enable_qos3">> => <<"false">>} + ). + default_config() -> ?CONF_DEFAULT. @@ -464,7 +480,7 @@ t_subscribe_case08(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -t_publish_negqos_case09(_) -> +t_publish_negqos_enabled(_) -> Dup = 0, QoS = 0, NegQoS = 3, @@ -503,6 +519,30 @@ t_publish_negqos_case09(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_publish_negqos_disabled(_) -> + restart_mqttsn_with_neg_qos_off(), + NegQoS = 3, + MsgId = 1, + Payload = <<"abc">>, + TopicId = ?MAX_PRED_TOPIC_ID, + {ok, Socket} = gen_udp:open(0, [binary]), + ?check_trace( + begin + send_publish_msg_predefined_topic(Socket, NegQoS, MsgId, TopicId, Payload), + ?assertEqual( + <<7, ?SN_PUBACK, TopicId:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, + receive_response(Socket) + ), + receive_response(Socket) + end, + fun(Trace0) -> + Trace = ?of_kind(ignore_negative_qos, Trace0), + ?assertMatch([#{return_code := ?SN_RC_NOT_SUPPORTED}], Trace) + end + ), + restart_mqttsn_with_neg_qos_on(), + gen_udp:close(Socket). + t_publish_qos0_case01(_) -> Dup = 0, QoS = 0,