diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 8a40f614f..2bd27339c 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -669,6 +669,11 @@ end). end ). +-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty). +-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty). +-define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter'). +-define(SHARE_NAME_INVALID_CHAR, share_subscription_group_name_cannot_include_wildcard). + -define(FRAME_PARSE_ERROR, frame_parse_error). -define(FRAME_SERIALIZE_ERROR, frame_serialize_error). -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})). diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index f667455b4..c1d91fbf1 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -90,11 +90,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter -> -spec validate(name | filter, topic()) -> true. validate(_, <<>>) -> + %% MQTT-5.0 [MQTT-4.7.3-1] error(empty_topic); validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) -> + %% MQTT-5.0 [MQTT-4.7.3-3] error(topic_too_long); -validate(filter, Topic) when is_binary(Topic) -> - validate2(words(Topic)); +validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) -> + validate_share(SharedFilter); +validate(filter, Filter) when is_binary(Filter) -> + validate2(words(Filter)); validate(name, Topic) when is_binary(Topic) -> Words = words(Topic), validate2(Words) andalso @@ -122,6 +126,32 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). +validate_share(<<"$share/", Rest/binary>>) when + Rest =:= <<>> orelse Rest =:= <<"/">> +-> + %% MQTT-5.0 [MQTT-4.8.2-1] + error(?SHARE_EMPTY_FILTER); +validate_share(<<"$share/", Rest/binary>>) -> + case binary:split(Rest, <<"/">>) of + %% MQTT-5.0 [MQTT-4.8.2-1] + [<<>>, _] -> + error(?SHARE_EMPTY_GROUP); + %% MQTT-5.0 [MQTT-4.7.3-1] + [_, <<>>] -> + error(?SHARE_EMPTY_FILTER); + [ShareName, Filter] -> + validate_share(ShareName, Filter) + end. + +validate_share(_, <<"$share/", _Rest/binary>>) -> + error(?SHARE_RECURSIVELY); +validate_share(ShareName, Filter) -> + case binary:match(ShareName, [<<"+">>, <<"#">>]) of + %% MQTT-5.0 [MQTT-4.8.2-2] + nomatch -> validate2(words(Filter)); + _ -> error(?SHARE_NAME_INVALID_CHAR) + end. + %% @doc Prepend a topic prefix. %% Ensured to have only one / between prefix and suffix. prepend(undefined, W) -> diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index cbd7e5a6d..fcbf053ba 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). -import( @@ -130,14 +131,35 @@ t_validate(_) -> true = validate({filter, <<"x">>}), true = validate({name, <<"x//y">>}), true = validate({filter, <<"sport/tennis/#">>}), + %% MQTT-5.0 [MQTT-4.7.3-1] ?assertError(empty_topic, validate({name, <<>>})), + ?assertError(empty_topic, validate({filter, <<>>})), ?assertError(topic_name_error, validate({name, <<"abc/#">>})), ?assertError(topic_too_long, validate({name, long_topic()})), - ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})), ?assertError(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})), ?assertError(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})), ?assertError(topic_invalid_char, validate({filter, <<"sport/tennis#">>})), - ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})). + %% MQTT-5.0 [MQTT-4.7.1-1] + ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})), + ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})), + %% MQTT-5.0 [MQTT-4.8.2-1] + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/">>})), + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share//">>})), + ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//t">>})), + ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//test">>})), + %% MQTT-5.0 [MQTT-4.7.3-1] for shared-sub + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g/">>})), + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g2/">>})), + %% MQTT-5.0 [MQTT-4.8.2-2] + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/p+q/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/m+/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/+n/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#y/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/#y/1">>})), + %% share recursively + ?assertError(?SHARE_RECURSIVELY, validate({filter, <<"$share/g1/$share/t">>})), + true = validate({filter, <<"$share/g1/topic/$share">>}). t_sigle_level_validate(_) -> true = validate({filter, <<"+">>}),