diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 44b8b64d3..130ad155a 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -94,9 +94,6 @@ awaiting_rel_max ]). --define(CHANMOCK(P), {exproto_anonymous_client, P}). --define(CHAN_CONN_TAB, emqx_channel_conn). - %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -155,15 +152,14 @@ init(ConnInfo = #{socktype := Socktype, Channel = #channel{gcli = #{channel => GRpcChann}, conninfo = NConnInfo, clientinfo = ClientInfo, - conn_state = connecting, + conn_state = accepted, timers = #{} }, case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of {error, _Reason} -> throw(nopermission); _ -> - ConnMod = maps:get(conn_mod, NConnInfo), - true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}), + ok = register_the_anonymous_client(ClientInfo, NConnInfo), Req = #{conninfo => peercert(Peercert, #{socktype => socktype(Socktype), @@ -172,6 +168,22 @@ init(ConnInfo = #{socktype := Socktype, try_dispatch(on_socket_created, wrap(Req), Channel) end. +register_the_anonymous_client(ClientInfo, ConnInfo) -> + ClientId = maps:get(clientid, ClientInfo), + case emqx_cm:open_session(true, ClientInfo, ConnInfo) of + {ok, _} -> + ?LOG(debug, "Registered an anonymous connection, " + "temporary clientid: ~s", [ClientId]), + emqx_logger:set_metadata_clientid(ClientId), + _ = self() ! {event, accepted}, + ok; + {error, Reason} -> + throw({register_anonymous_error, Reason}) + end. + +unregister_the_anonymous_client(ClientId) -> + emqx_cm:unregister_channel(ClientId). + %% @private peercert(NoSsl, ConnInfo) when NoSsl == nossl; NoSsl == undefined -> @@ -274,15 +286,14 @@ handle_call(close, Channel) -> handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; -handle_call({auth, ClientInfo0, Password}, +handle_call({auth, RequestedClientInfo, Password}, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), - NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo), + clientinfo = ClientInfo0}) -> + ClientInfo1 = enrich_clientinfo(RequestedClientInfo, ClientInfo0), + NConnInfo = enrich_conninfo(RequestedClientInfo, ConnInfo), Channel1 = Channel#channel{conninfo = NConnInfo, clientinfo = ClientInfo1}, - #{clientid := ClientId, username := Username} = ClientInfo1, case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of @@ -292,9 +303,10 @@ handle_call({auth, ClientInfo0, Password}, emqx_metrics:inc('client.auth.anonymous'), NClientInfo = maps:merge(ClientInfo1, AuthResult), NChannel = Channel1#channel{clientinfo = NClientInfo}, - clean_anonymous_clients(), case emqx_cm:open_session(true, NClientInfo, NConnInfo) of {ok, _Session} -> + AnonymousClientId = maps:get(clientid, ClientInfo0), + unregister_the_anonymous_client(AnonymousClientId), ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!", [ClientId, Username]), {reply, ok, [{event, connected}], ensure_connected(NChannel)}; @@ -406,16 +418,12 @@ handle_info(Info, Channel) -> -spec(terminate(any(), channel()) -> channel()). terminate(Reason, Channel) -> - clean_anonymous_clients(), Req = #{reason => stringfy(Reason)}, try_dispatch(on_socket_closed, wrap(Req), Channel). is_anonymous(#{anonymous := true}) -> true; is_anonymous(_AuthResult) -> false. -clean_anonymous_clients() -> - ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())). - packet_to_message(Topic, Qos, Payload, #channel{ conninfo = #{proto_ver := ProtoVer}, @@ -608,23 +616,32 @@ default_conninfo(ConnInfo) -> username => undefined, conn_props => #{}, connected => true, + proto_name => <<"exproto">>, + proto_ver => <<"1.0">>, connected_at => erlang:system_time(millisecond), keepalive => 0, receive_maximum => 0, expiry_interval => 0}. -default_clientinfo(#{peername := {PeerHost, _}, +default_clientinfo(#{peername := {PeerHost, PeerPort}, sockname := {_, SockPort}}) -> #{zone => external, - protocol => undefined, + protocol => exproto, peerhost => PeerHost, sockport => SockPort, - clientid => undefined, + clientid => anonymous_clientid(PeerHost, PeerPort), username => undefined, is_bridge => false, is_superuser => false, mountpoint => undefined}. +anonymous_clientid(PeerHost, PeerPort) -> + iolist_to_binary( + ["exproto-anonymous-", + inet:ntoa(PeerHost), "-", integer_to_list(PeerPort), + "-", emqx_rule_id:gen() + ]). + stringfy(Reason) -> unicode:characters_to_binary((io_lib:format("~0p", [Reason]))). diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index 02c0b31d6..f0cca7bec 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -439,7 +439,8 @@ handle_msg({close, Reason}, State) -> ?LOG(debug, "Force to close the socket due to ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); -handle_msg({event, connected}, State = #state{channel = Channel}) -> +handle_msg({event, Event}, State = #state{channel = Channel}) + when Event == connected; Event == accepted -> ClientId = emqx_exproto_channel:info(clientid, Channel), emqx_cm:insert_channel_info(ClientId, info(State), stats(State)); diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7d3dbddc8..f47fea6eb 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -271,6 +271,7 @@ format_channel_info({_Key, Info, Stats0}) -> SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), Connected = case maps:get(conn_state, Info, connected) of connected -> true; + accepted -> true; %% for exproto anonymous clients _ -> false end, NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index 1f9ba7af6..cfa366027 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -50,9 +50,11 @@ monitor(Pid, PMon) -> ?MODULE:monitor(Pid, undefined, PMon). -spec(monitor(pid(), term(), pmon()) -> pmon()). -monitor(Pid, Val, PMon = ?PMON(Map)) -> +monitor(Pid, Val, ?PMON(Map)) -> case maps:is_key(Pid, Map) of - true -> PMon; + true -> + {Ref, _Val} = maps:get(Pid, Map), + ?PMON(maps:put(Pid, {Ref, Val}, Map)); false -> Ref = erlang:monitor(process, Pid), ?PMON(maps:put(Pid, {Ref, Val}, Map))