refactor(exproto): pre-register the anonymous client

This commit is contained in:
JianBo He 2022-01-28 12:47:57 +08:00
parent f42c04d6fc
commit b802bcb6fc
4 changed files with 43 additions and 22 deletions

View File

@ -94,9 +94,6 @@
awaiting_rel_max awaiting_rel_max
]). ]).
-define(CHANMOCK(P), {exproto_anonymous_client, P}).
-define(CHAN_CONN_TAB, emqx_channel_conn).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Attrs and Caps %% Info, Attrs and Caps
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -155,15 +152,14 @@ init(ConnInfo = #{socktype := Socktype,
Channel = #channel{gcli = #{channel => GRpcChann}, Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo, conninfo = NConnInfo,
clientinfo = ClientInfo, clientinfo = ClientInfo,
conn_state = connecting, conn_state = accepted,
timers = #{} timers = #{}
}, },
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
{error, _Reason} -> {error, _Reason} ->
throw(nopermission); throw(nopermission);
_ -> _ ->
ConnMod = maps:get(conn_mod, NConnInfo), ok = register_the_anonymous_client(ClientInfo, NConnInfo),
true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}),
Req = #{conninfo => Req = #{conninfo =>
peercert(Peercert, peercert(Peercert,
#{socktype => socktype(Socktype), #{socktype => socktype(Socktype),
@ -172,6 +168,22 @@ init(ConnInfo = #{socktype := Socktype,
try_dispatch(on_socket_created, wrap(Req), Channel) try_dispatch(on_socket_created, wrap(Req), Channel)
end. end.
register_the_anonymous_client(ClientInfo, ConnInfo) ->
ClientId = maps:get(clientid, ClientInfo),
case emqx_cm:open_session(true, ClientInfo, ConnInfo) of
{ok, _} ->
?LOG(debug, "Registered an anonymous connection, "
"temporary clientid: ~s", [ClientId]),
emqx_logger:set_metadata_clientid(ClientId),
_ = self() ! {event, accepted},
ok;
{error, Reason} ->
throw({register_anonymous_error, Reason})
end.
unregister_the_anonymous_client(ClientId) ->
emqx_cm:unregister_channel(ClientId).
%% @private %% @private
peercert(NoSsl, ConnInfo) when NoSsl == nossl; peercert(NoSsl, ConnInfo) when NoSsl == nossl;
NoSsl == undefined -> NoSsl == undefined ->
@ -274,15 +286,14 @@ handle_call(close, Channel) ->
handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
handle_call({auth, ClientInfo0, Password}, handle_call({auth, RequestedClientInfo, Password},
Channel = #channel{conninfo = ConnInfo, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) -> clientinfo = ClientInfo0}) ->
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), ClientInfo1 = enrich_clientinfo(RequestedClientInfo, ClientInfo0),
NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo), NConnInfo = enrich_conninfo(RequestedClientInfo, ConnInfo),
Channel1 = Channel#channel{conninfo = NConnInfo, Channel1 = Channel#channel{conninfo = NConnInfo,
clientinfo = ClientInfo1}, clientinfo = ClientInfo1},
#{clientid := ClientId, username := Username} = ClientInfo1, #{clientid := ClientId, username := Username} = ClientInfo1,
case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of
@ -292,9 +303,10 @@ handle_call({auth, ClientInfo0, Password},
emqx_metrics:inc('client.auth.anonymous'), emqx_metrics:inc('client.auth.anonymous'),
NClientInfo = maps:merge(ClientInfo1, AuthResult), NClientInfo = maps:merge(ClientInfo1, AuthResult),
NChannel = Channel1#channel{clientinfo = NClientInfo}, NChannel = Channel1#channel{clientinfo = NClientInfo},
clean_anonymous_clients(),
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
{ok, _Session} -> {ok, _Session} ->
AnonymousClientId = maps:get(clientid, ClientInfo0),
unregister_the_anonymous_client(AnonymousClientId),
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!", ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
[ClientId, Username]), [ClientId, Username]),
{reply, ok, [{event, connected}], ensure_connected(NChannel)}; {reply, ok, [{event, connected}], ensure_connected(NChannel)};
@ -406,16 +418,12 @@ handle_info(Info, Channel) ->
-spec(terminate(any(), channel()) -> channel()). -spec(terminate(any(), channel()) -> channel()).
terminate(Reason, Channel) -> terminate(Reason, Channel) ->
clean_anonymous_clients(),
Req = #{reason => stringfy(Reason)}, Req = #{reason => stringfy(Reason)},
try_dispatch(on_socket_closed, wrap(Req), Channel). try_dispatch(on_socket_closed, wrap(Req), Channel).
is_anonymous(#{anonymous := true}) -> true; is_anonymous(#{anonymous := true}) -> true;
is_anonymous(_AuthResult) -> false. is_anonymous(_AuthResult) -> false.
clean_anonymous_clients() ->
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
packet_to_message(Topic, Qos, Payload, packet_to_message(Topic, Qos, Payload,
#channel{ #channel{
conninfo = #{proto_ver := ProtoVer}, conninfo = #{proto_ver := ProtoVer},
@ -608,23 +616,32 @@ default_conninfo(ConnInfo) ->
username => undefined, username => undefined,
conn_props => #{}, conn_props => #{},
connected => true, connected => true,
proto_name => <<"exproto">>,
proto_ver => <<"1.0">>,
connected_at => erlang:system_time(millisecond), connected_at => erlang:system_time(millisecond),
keepalive => 0, keepalive => 0,
receive_maximum => 0, receive_maximum => 0,
expiry_interval => 0}. expiry_interval => 0}.
default_clientinfo(#{peername := {PeerHost, _}, default_clientinfo(#{peername := {PeerHost, PeerPort},
sockname := {_, SockPort}}) -> sockname := {_, SockPort}}) ->
#{zone => external, #{zone => external,
protocol => undefined, protocol => exproto,
peerhost => PeerHost, peerhost => PeerHost,
sockport => SockPort, sockport => SockPort,
clientid => undefined, clientid => anonymous_clientid(PeerHost, PeerPort),
username => undefined, username => undefined,
is_bridge => false, is_bridge => false,
is_superuser => false, is_superuser => false,
mountpoint => undefined}. mountpoint => undefined}.
anonymous_clientid(PeerHost, PeerPort) ->
iolist_to_binary(
["exproto-anonymous-",
inet:ntoa(PeerHost), "-", integer_to_list(PeerPort),
"-", emqx_rule_id:gen()
]).
stringfy(Reason) -> stringfy(Reason) ->
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))). unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).

View File

@ -439,7 +439,8 @@ handle_msg({close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason]), ?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) -> handle_msg({event, Event}, State = #state{channel = Channel})
when Event == connected; Event == accepted ->
ClientId = emqx_exproto_channel:info(clientid, Channel), ClientId = emqx_exproto_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)); emqx_cm:insert_channel_info(ClientId, info(State), stats(State));

View File

@ -271,6 +271,7 @@ format_channel_info({_Key, Info, Stats0}) ->
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
Connected = case maps:get(conn_state, Info, connected) of Connected = case maps:get(conn_state, Info, connected) of
connected -> true; connected -> true;
accepted -> true; %% for exproto anonymous clients
_ -> false _ -> false
end, end,
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),

View File

@ -50,9 +50,11 @@ monitor(Pid, PMon) ->
?MODULE:monitor(Pid, undefined, PMon). ?MODULE:monitor(Pid, undefined, PMon).
-spec(monitor(pid(), term(), pmon()) -> pmon()). -spec(monitor(pid(), term(), pmon()) -> pmon()).
monitor(Pid, Val, PMon = ?PMON(Map)) -> monitor(Pid, Val, ?PMON(Map)) ->
case maps:is_key(Pid, Map) of case maps:is_key(Pid, Map) of
true -> PMon; true ->
{Ref, _Val} = maps:get(Pid, Map),
?PMON(maps:put(Pid, {Ref, Val}, Map));
false -> false ->
Ref = erlang:monitor(process, Pid), Ref = erlang:monitor(process, Pid),
?PMON(maps:put(Pid, {Ref, Val}, Map)) ?PMON(maps:put(Pid, {Ref, Val}, Map))