diff --git a/etc/emqx.conf b/etc/emqx.conf index 341056344..0dc1b314e 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1853,12 +1853,36 @@ module.presence.qos = 1 module.subscription = off ## Subscribe the Topics automatically when client connected. -## module.subscription.1.topic = $client/%c -## Qos of the subscription: 0 | 1 | 2 -## module.subscription.1.qos = 1 +## +## Value: String +## module.subscription.1.topic = connected/%c/%u -## module.subscription.2.topic = $user/%u -## module.subscription.2.qos = 1 +## Qos of the proxy subscription. +## +## Value: 0 | 1 | 2 +## Default: 0 +## module.subscription.1.qos = 0 + +## No Local of the proxy subscription options. +## This configuration only takes effect in the MQTT V5 protocol. +## +## Value: 0 | 1 +## Default: 0 +## module.subscription.1.nl = 0 + +## Retain As Published of the proxy subscription options. +## This configuration only takes effect in the MQTT V5 protocol. +## +## Value: 0 | 1 +## Default: 0 +## module.subscription.1.rap = 0 + +## Retain Handling of the proxy subscription options. +## This configuration only takes effect in the MQTT V5 protocol. +## +## Value: 0 | 1 | 2 +## Default: 0 +## module.subscription.1.rh = 0 ##-------------------------------------------------------------------- ## Rewrite Module diff --git a/priv/emqx.schema b/priv/emqx.schema index 024e14815..2f862c9bd 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1,5 +1,5 @@ %%-*- mode: erlang -*- -%% EMQ X R3.0 config mapping +%% EMQ X R4.0 config mapping %%-------------------------------------------------------------------- %% Cluster @@ -1814,6 +1814,24 @@ end}. {validators, ["range:0-2"]} ]}. +{mapping, "module.subscription.$id.nl", "emqx.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-1"]} +]}. + +{mapping, "module.subscription.$id.rap", "emqx.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-1"]} +]}. + +{mapping, "module.subscription.$id.rh", "emqx.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + {mapping, "module.rewrite", "emqx.modules", [ {default, off}, {datatype, flag} @@ -1826,10 +1844,12 @@ end}. {translation, "emqx.modules", fun(Conf) -> Subscriptions = fun() -> List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), - QosList = [Qos || {_, Qos} <- lists:sort([{I, Qos} || {[_,"subscription", I,"qos"], Qos} <- List])], - TopicList = [iolist_to_binary(Topic) || {_, Topic} <- - lists:sort([{I, Topic} || {[_,"subscription", I, "topic"], Topic} <- List])], - lists:zip(TopicList, QosList) + TopicList = [{N, Topic}|| {[_,"subscription",N,"topic"], Topic} <- List], + [{iolist_to_binary(T), #{ qos => cuttlefish:conf_get("module.subscription." ++ N ++ ".qos", Conf, 0), + nl => cuttlefish:conf_get("module.subscription." ++ N ++ ".nl", Conf, 0), + rap => cuttlefish:conf_get("module.subscription." ++ N ++ ".rap", Conf, 0), + rh => cuttlefish:conf_get("module.subscription." ++ N ++ ".rh", Conf, 0) + }} || {N, T} <- TopicList] end, Rewrites = fun() -> Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index d17162688..79bb8dc63 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -36,11 +36,14 @@ load(Topics) -> emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). -on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo, Topics) -> +on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) -> Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, - TopicFilters = [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics], + TopicFilters = case ProtoVer of + ?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics]; + _ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics] + end, self() ! {subscribe, TopicFilters}. unload(_) -> diff --git a/test/emqx_mod_subscription_SUITE.erl b/test/emqx_mod_subscription_SUITE.erl index 1b0e98f40..06cd17cea 100644 --- a/test/emqx_mod_subscription_SUITE.erl +++ b/test/emqx_mod_subscription_SUITE.erl @@ -33,7 +33,7 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_on_client_connected(_) -> - ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}])), + ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])), {ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}, {username, "admin"}]), @@ -43,18 +43,42 @@ t_on_client_connected(_) -> ?assertEqual(<<"connected/myclient/admin">>, Topic), ?assertEqual(<<"Hello world">>, Payload), ok = emqtt:disconnect(C), - ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, ?QOS_0}])). + ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])). t_on_undefined_client_connected(_) -> - ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/undefined">>, ?QOS_0}])), + ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/undefined">>, #{qos => ?QOS_1}}])), {ok, C} = emqtt:start_link([{host, "localhost"}]), {ok, _} = emqtt:connect(C), - emqtt:publish(C, <<"connected/undefined">>, <<"Hello world">>, ?QOS_0), + emqtt:publish(C, <<"connected/undefined">>, <<"Hello world">>, ?QOS_1), {ok, #{topic := Topic, payload := Payload}} = receive_publish(100), ?assertEqual(<<"connected/undefined">>, Topic), ?assertEqual(<<"Hello world">>, Payload), ok = emqtt:disconnect(C), - ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, ?QOS_0}])). + ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, #{qos => ?QOS_1}}])). + +t_suboption(_) -> + Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end, + Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2}, + ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])), + {ok, C1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + timer:sleep(200), + [CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)), + [ Sub1 | _ ] = ets:lookup(emqx_subscription,CPid1), + [ Suboption1 | _ ] = ets:lookup(emqx_suboption,Sub1), + ?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1), + ok = emqtt:disconnect(C1), + %% The subscription option is not valid for MQTT V3.1.1 + {ok, C2} = emqtt:start_link([{proto_ver, v4}]), + {ok, _} = emqtt:connect(C2), + timer:sleep(200), + [CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)), + [ Sub2 | _ ] = ets:lookup(emqx_subscription,CPid2), + [ Suboption2 | _ ] = ets:lookup(emqx_suboption,Sub2), + ok = emqtt:disconnect(C2), + ?assertMatch({Sub2, #{qos := 2, nl := 0, rap := 0, rh := 0, subid := _}}, Suboption2), + + ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, Suboption}])). %%-------------------------------------------------------------------- %% Internal functions @@ -66,4 +90,3 @@ receive_publish(Timeout) -> after Timeout -> {error, timeout} end. -