From 1b6a586948710b3712f950c77c9260f5bed6d64b Mon Sep 17 00:00:00 2001 From: tigercl Date: Sat, 20 Jun 2020 15:07:37 +0800 Subject: [PATCH] feature(mqtt): support response information (#3533) --- etc/emqx.conf | 15 +++++++++++++++ priv/emqx.schema | 12 ++++++++++++ src/emqx_channel.erl | 11 +++++++++++ src/emqx_zone.erl | 6 ++++++ test/emqx_channel_SUITE.erl | 29 ++++++++++++++++++++++++++--- 5 files changed, 70 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 7b23e46a1..b22f22c24 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -663,6 +663,11 @@ mqtt.ignore_loop_deliver = false ## Value: true | false mqtt.strict_mode = false +## Specify the response information returned to the client +## +## Value: String +## mqtt.response_information = example + ##-------------------------------------------------------------------- ## Zones ##-------------------------------------------------------------------- @@ -868,6 +873,11 @@ zone.external.ignore_loop_deliver = false ## Value: true | false zone.external.strict_mode = false +## Specify the response information returned to the client +## +## Value: String +## zone.external.response_information = example + ##-------------------------------------------------------------------- ## Internal Zone @@ -954,6 +964,11 @@ zone.internal.ignore_loop_deliver = false ## Value: true | false zone.internal.strict_mode = false +## Specify the response information returned to the client +## +## Value: String +## zone.internal.response_information = example + ## Allow the zone's clients to bypass authentication step ## ## Value: true | false diff --git a/priv/emqx.schema b/priv/emqx.schema index 1149e7c79..abd790b53 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -805,6 +805,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Specify the response information returned to the client +{mapping, "mqtt.response_information", "emqx.response_information", [ + {datatype, string} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -1019,6 +1024,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Specify the response information returned to the client +{mapping, "zone.$name.response_information", "emqx.zones", [ + {datatype, string} +]}. + %% @doc Whether to bypass the authentication step {mapping, "zone.$name.bypass_auth_plugins", "emqx.zones", [ {default, false}, @@ -1079,6 +1089,8 @@ end}. end; ("mountpoint", Val) -> {mountpoint, iolist_to_binary(Val)}; + ("response_information", Val) -> + {response_information, iolist_to_binary(Val)}; (Opt, Val) -> {list_to_atom(Opt), Val} end, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 09117d8df..a5d02734b 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -676,6 +676,7 @@ not_nacked({deliver, _Topic, Msg}) -> handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) -> AckProps = run_fold([fun enrich_connack_caps/2, fun enrich_server_keepalive/2, + fun enrich_response_information/2, fun enrich_assigned_clientid/2 ], Props, Channel), NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps), @@ -1393,6 +1394,16 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) -> Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} end. +%%-------------------------------------------------------------------- +%% Enrich response information + +enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps}, + clientinfo = #{zone := Zone}}) -> + case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of + 0 -> AckProps; + 1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)} + end. + %%-------------------------------------------------------------------- %% Enrich Assigned ClientId diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index cf3b14428..311ccaa77 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -45,6 +45,7 @@ , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 + , response_information/1 , get_env/2 , get_env/3 ]}). @@ -72,6 +73,7 @@ , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 + , response_information/1 ]). -export([ init_gc_state/1 @@ -204,6 +206,10 @@ force_gc_policy(Zone) -> force_shutdown_policy(Zone) -> get_env(Zone, force_shutdown_policy). +-spec(response_information(zone()) -> string()). +response_information(Zone) -> + get_env(Zone, response_information). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 5d7020825..8a48b578f 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -58,7 +58,8 @@ end_per_suite(_Config) -> emqx_session, emqx_broker, emqx_hooks, - emqx_cm + emqx_cm, + emqx_zone ]). init_per_testcase(_TestCase, Config) -> @@ -394,6 +395,27 @@ t_handle_out_connack_sucess(_) -> emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()), ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). +t_handle_out_connack_response_information(_) -> + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), + IdleChannel = channel(#{conn_state => idle}), + {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], _} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 1})), IdleChannel). + +t_handle_out_connack_not_response_information(_) -> + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), + IdleChannel = channel(#{conn_state => idle}), + {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 0})), IdleChannel), + ?assertEqual(false, maps:is_key('Response-Information', AckProps)). + t_handle_out_connack_failure(_) -> {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} = emqx_channel:handle_out(connack, ?RC_NOT_AUTHORIZED, channel()). @@ -639,14 +661,15 @@ clientinfo(InitProps) -> topic_filters() -> [{<<"+">>, ?DEFAULT_SUBOPTS}, {<<"#">>, ?DEFAULT_SUBOPTS}]. -connpkt() -> +connpkt() -> connpkt(#{}). +connpkt(Props) -> #mqtt_packet_connect{ proto_name = <<"MQTT">>, proto_ver = ?MQTT_PROTO_V4, is_bridge = false, clean_start = true, keepalive = 30, - properties = #{}, + properties = Props, clientid = <<"clientid">>, username = <<"username">>, password = <<"passwd">>