diff --git a/etc/emqx.conf b/etc/emqx.conf index f3af119ce..1d6603091 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -649,6 +649,12 @@ zone.external.mqueue_store_qos0 = true ## Value: String ## zone.external.mountpoint = devicebound/ +## Whether use username replace client id +## +## Value: boolean +## Default: false +zone.external.use_username_as_clientid = false + ##-------------------------------------------------------------------- ## Internal Zone @@ -708,6 +714,12 @@ zone.internal.mqueue_store_qos0 = true ## Value: String ## zone.internal.mountpoint = cloudbound/ +## Whether use username replace client id +## +## Value: boolean +## Default: false +zone.internal.use_username_as_clientid = false + ##-------------------------------------------------------------------- ## Listeners ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index fce0f5c76..56f393cd5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -794,6 +794,12 @@ end}. {datatype, string} ]}. +%% @doc Use username replace client id +{mapping, "zone.$name.use_username_as_clientid", "emqx.zones", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 44b0925c3..f26445571 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -289,13 +289,17 @@ process_packet(?CONNECT_PACKET( client_id = ClientId, username = Username, password = Password} = Connect), PState) -> - emqx_logger:add_metadata_client_id(ClientId), + + NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState), + + emqx_logger:add_metadata_client_id(NewClientId), + %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = make_will_msg(Connect), PState1 = set_username(Username, - PState#pstate{client_id = ClientId, + PState#pstate{client_id = NewClientId, proto_ver = ProtoVer, proto_name = ProtoName, clean_start = CleanStart, @@ -607,6 +611,19 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun {error, Reason} end. +%%------------------------------------------------------------------------------ +%% Maybe use username replace client id + +maybe_use_username_as_clientid(ClientId, undefined, _PState) -> + ClientId; +maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) -> + case emqx_zone:get_env(Zone, use_username_as_clientid, false) of + true -> + Username; + false -> + ClientId + end. + %%------------------------------------------------------------------------------ %% Assign a clientid diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 3f6c4f252..ae308ea42 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -60,22 +60,45 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -with_connection(DoFun) -> +batch_connect(NumberOfConnections) -> + batch_connect([], NumberOfConnections). + +batch_connect(Socks, 0) -> + Socks; +batch_connect(Socks, NumberOfConnections) -> {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, - [binary, {packet, raw}, - {active, false}], 3000), + [binary, {packet, raw}, {active, false}], + 3000), + batch_connect([Sock | Socks], NumberOfConnections - 1). + +with_connection(DoFun, NumberOfConnections) -> + Socks = batch_connect(NumberOfConnections), try - DoFun(Sock) + DoFun(Socks) after - emqx_client_sock:close(Sock) + lists:foreach(fun(Sock) -> + emqx_client_sock:close(Sock) + end, Socks) end. +with_connection(DoFun) -> + with_connection(DoFun, 1). + + % {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + % [binary, {packet, raw}, + % {active, false}], 3000), + % try + % DoFun(Sock) + % after + % emqx_client_sock:close(Sock) + % end. + connect_v4(_) -> - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))), {error, closed} =gen_tcp:recv(Sock, 0) end), - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> ConnectPacket = raw_send_serialize(?CONNECT_PACKET (#mqtt_packet_connect{ client_id = <<"mqttv4_client">>, @@ -94,7 +117,7 @@ connect_v4(_) -> connect_v5(_) -> - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize( ?CONNECT_PACKET(#mqtt_packet_connect{ @@ -105,7 +128,7 @@ connect_v5(_) -> {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize( ?CONNECT_PACKET( @@ -117,7 +140,7 @@ connect_v5(_) -> {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize( ?CONNECT_PACKET( @@ -133,7 +156,7 @@ connect_v5(_) -> end), % test clean start - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize( ?CONNECT_PACKET( @@ -153,7 +176,7 @@ connect_v5(_) -> timer:sleep(1000), - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize( ?CONNECT_PACKET( @@ -169,7 +192,7 @@ connect_v5(_) -> end), % test will message publish and cancel - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize( ?CONNECT_PACKET( @@ -250,6 +273,59 @@ connect_v5(_) -> emqx_client_sock:close(Sock2) end), + + % duplicate client id + with_connection(fun([Sock, Sock1]) -> + emqx_zone:set_env(external, use_username_as_clientid, true), + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = true, + client_id = <<"myclient">>, + properties = + #{'Session-Expiry-Interval' => 10}}) + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock1, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = false, + client_id = <<"myclient">>, + username = <<"admin">>, + password = <<"public">>, + properties = + #{'Session-Expiry-Interval' => 10}}) + )), + {ok, Data1} = gen_tcp:recv(Sock1, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5})), + + {ok, SubData} = gen_tcp:recv(Sock, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock1, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5})), + + {ok, SubData1} = gen_tcp:recv(Sock1, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5) + end, 2), + ok. do_connect(Sock, ProtoVer) -> @@ -263,7 +339,7 @@ do_connect(Sock, ProtoVer) -> {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ProtoVer). subscribe_v4(_) -> - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V4), SubPacket = raw_send_serialize( ?SUBSCRIBE_PACKET(15, @@ -279,7 +355,7 @@ subscribe_v4(_) -> ok. subscribe_v5(_) -> - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), SubPacket = raw_send_serialize(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1},[]), #{version => ?MQTT_PROTO_V5}), @@ -288,7 +364,7 @@ subscribe_v5(_) -> {ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), _} = raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) end), - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), SubPacket = raw_send_serialize( ?SUBSCRIBE_PACKET(0, #{}, [{<<"TopicQos0">>, @@ -301,7 +377,7 @@ subscribe_v5(_) -> {ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} = raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) end), - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), SubPacket = raw_send_serialize( ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 0}, @@ -315,7 +391,7 @@ subscribe_v5(_) -> {ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), _} = raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) end), - with_connection(fun(Sock) -> + with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), SubPacket = raw_send_serialize( ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1},