diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 7542e0ef3..e38cb1b0f 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -21,11 +21,13 @@ File format: * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, to force an immediate reload of all certificates after the files are updated on disk. +* Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] ### Bug fixes * Fix case where publishing to a non-existent topic alias would crash the connection [#6979] * Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009] +* Fix the ExProto connection registry is not released after the client process abnormally exits [#6983] * Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085] * Fix Stomp client can not trigger `$event/client_connection` message [#7096] diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src index 0ed54e6fd..4ce137fc5 100644 --- a/apps/emqx_exproto/src/emqx_exproto.app.src +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_exproto, [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.3.5"}, %% 4.3.3 is used by ee + {vsn, "4.3.6"}, %% 4.3.3 is used by ee {modules, []}, {registered, []}, {mod, {emqx_exproto_app, []}}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 7da28e773..590a2106f 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,11 +1,6 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.4", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.3", - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, - {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.2", + [{<<"4\\.3\\.[2-5]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4.3.[0-1]">>, @@ -14,12 +9,7 @@ {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.4", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.3", - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, - {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.2", + [{<<"4\\.3\\.[2-5]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4.3.[0-1]">>, diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 44b8b64d3..130ad155a 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -94,9 +94,6 @@ awaiting_rel_max ]). --define(CHANMOCK(P), {exproto_anonymous_client, P}). --define(CHAN_CONN_TAB, emqx_channel_conn). - %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -155,15 +152,14 @@ init(ConnInfo = #{socktype := Socktype, Channel = #channel{gcli = #{channel => GRpcChann}, conninfo = NConnInfo, clientinfo = ClientInfo, - conn_state = connecting, + conn_state = accepted, timers = #{} }, case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of {error, _Reason} -> throw(nopermission); _ -> - ConnMod = maps:get(conn_mod, NConnInfo), - true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}), + ok = register_the_anonymous_client(ClientInfo, NConnInfo), Req = #{conninfo => peercert(Peercert, #{socktype => socktype(Socktype), @@ -172,6 +168,22 @@ init(ConnInfo = #{socktype := Socktype, try_dispatch(on_socket_created, wrap(Req), Channel) end. +register_the_anonymous_client(ClientInfo, ConnInfo) -> + ClientId = maps:get(clientid, ClientInfo), + case emqx_cm:open_session(true, ClientInfo, ConnInfo) of + {ok, _} -> + ?LOG(debug, "Registered an anonymous connection, " + "temporary clientid: ~s", [ClientId]), + emqx_logger:set_metadata_clientid(ClientId), + _ = self() ! {event, accepted}, + ok; + {error, Reason} -> + throw({register_anonymous_error, Reason}) + end. + +unregister_the_anonymous_client(ClientId) -> + emqx_cm:unregister_channel(ClientId). + %% @private peercert(NoSsl, ConnInfo) when NoSsl == nossl; NoSsl == undefined -> @@ -274,15 +286,14 @@ handle_call(close, Channel) -> handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; -handle_call({auth, ClientInfo0, Password}, +handle_call({auth, RequestedClientInfo, Password}, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), - NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo), + clientinfo = ClientInfo0}) -> + ClientInfo1 = enrich_clientinfo(RequestedClientInfo, ClientInfo0), + NConnInfo = enrich_conninfo(RequestedClientInfo, ConnInfo), Channel1 = Channel#channel{conninfo = NConnInfo, clientinfo = ClientInfo1}, - #{clientid := ClientId, username := Username} = ClientInfo1, case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of @@ -292,9 +303,10 @@ handle_call({auth, ClientInfo0, Password}, emqx_metrics:inc('client.auth.anonymous'), NClientInfo = maps:merge(ClientInfo1, AuthResult), NChannel = Channel1#channel{clientinfo = NClientInfo}, - clean_anonymous_clients(), case emqx_cm:open_session(true, NClientInfo, NConnInfo) of {ok, _Session} -> + AnonymousClientId = maps:get(clientid, ClientInfo0), + unregister_the_anonymous_client(AnonymousClientId), ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!", [ClientId, Username]), {reply, ok, [{event, connected}], ensure_connected(NChannel)}; @@ -406,16 +418,12 @@ handle_info(Info, Channel) -> -spec(terminate(any(), channel()) -> channel()). terminate(Reason, Channel) -> - clean_anonymous_clients(), Req = #{reason => stringfy(Reason)}, try_dispatch(on_socket_closed, wrap(Req), Channel). is_anonymous(#{anonymous := true}) -> true; is_anonymous(_AuthResult) -> false. -clean_anonymous_clients() -> - ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())). - packet_to_message(Topic, Qos, Payload, #channel{ conninfo = #{proto_ver := ProtoVer}, @@ -608,23 +616,32 @@ default_conninfo(ConnInfo) -> username => undefined, conn_props => #{}, connected => true, + proto_name => <<"exproto">>, + proto_ver => <<"1.0">>, connected_at => erlang:system_time(millisecond), keepalive => 0, receive_maximum => 0, expiry_interval => 0}. -default_clientinfo(#{peername := {PeerHost, _}, +default_clientinfo(#{peername := {PeerHost, PeerPort}, sockname := {_, SockPort}}) -> #{zone => external, - protocol => undefined, + protocol => exproto, peerhost => PeerHost, sockport => SockPort, - clientid => undefined, + clientid => anonymous_clientid(PeerHost, PeerPort), username => undefined, is_bridge => false, is_superuser => false, mountpoint => undefined}. +anonymous_clientid(PeerHost, PeerPort) -> + iolist_to_binary( + ["exproto-anonymous-", + inet:ntoa(PeerHost), "-", integer_to_list(PeerPort), + "-", emqx_rule_id:gen() + ]). + stringfy(Reason) -> unicode:characters_to_binary((io_lib:format("~0p", [Reason]))). diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index 02c0b31d6..f0cca7bec 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -439,7 +439,8 @@ handle_msg({close, Reason}, State) -> ?LOG(debug, "Force to close the socket due to ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); -handle_msg({event, connected}, State = #state{channel = Channel}) -> +handle_msg({event, Event}, State = #state{channel = Channel}) + when Event == connected; Event == accepted -> ClientId = emqx_exproto_channel:info(clientid, Channel), emqx_cm:insert_channel_info(ClientId, info(State), stats(State)); diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 1463334b4..a91868861 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,13 +1,13 @@ %% -*- mode: erlang -*- {VSN, - [ {<<"4\\.3\\.[0-9]+">>, + [ {<<"4\\.3\\.([0-9]|1[0])">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} ]}, {<<".*">>, []} ], - [ {<<"4\\.3\\.[0-9]+">>, + [ {<<"4\\.3\\.([0-9]|1[0])">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7d3dbddc8..f47fea6eb 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -271,6 +271,7 @@ format_channel_info({_Key, Info, Stats0}) -> SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), Connected = case maps:get(conn_state, Info, connected) of connected -> true; + accepted -> true; %% for exproto anonymous clients _ -> false end, NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), diff --git a/src/emqx.appup.src b/src/emqx.appup.src index a133d37a6..5300e6bc8 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -4,10 +4,12 @@ [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -23,6 +25,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -43,6 +46,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -63,6 +67,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -88,6 +93,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -113,6 +119,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -139,7 +146,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, @@ -167,6 +175,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, @@ -195,6 +204,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, @@ -224,6 +234,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, @@ -254,6 +265,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, @@ -284,6 +296,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, @@ -318,6 +331,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, @@ -357,11 +371,13 @@ [{"4.3.13", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -376,6 +392,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -395,6 +412,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -414,6 +432,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, @@ -438,6 +457,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, @@ -462,6 +482,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, @@ -488,6 +509,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -514,6 +536,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -541,6 +564,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -569,6 +593,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -598,6 +623,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -627,6 +653,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -660,6 +687,7 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index 1f9ba7af6..cfa366027 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -50,9 +50,11 @@ monitor(Pid, PMon) -> ?MODULE:monitor(Pid, undefined, PMon). -spec(monitor(pid(), term(), pmon()) -> pmon()). -monitor(Pid, Val, PMon = ?PMON(Map)) -> +monitor(Pid, Val, ?PMON(Map)) -> case maps:is_key(Pid, Map) of - true -> PMon; + true -> + {Ref, _Val} = maps:get(Pid, Map), + ?PMON(maps:put(Pid, {Ref, Val}, Map)); false -> Ref = erlang:monitor(process, Pid), ?PMON(maps:put(Pid, {Ref, Val}, Map))