From 3645cb244baed803f6ebd0015e8ebfe125299512 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 20 Mar 2022 09:50:27 +0800 Subject: [PATCH 1/7] fix(gw): throw the exproto start grpc server starting failure error --- apps/emqx_gateway/src/coap/emqx_coap_impl.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_api.erl | 4 ++-- apps/emqx_gateway/src/emqx_gateway_http.erl | 7 +++++- .../src/exproto/emqx_exproto_impl.erl | 22 +++++++++++-------- .../src/lwm2m/emqx_lwm2m_impl.erl | 2 +- apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 2 +- .../src/stomp/emqx_stomp_impl.erl | 2 +- 7 files changed, 25 insertions(+), 16 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index 220c25367..9a4c902dc 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -83,7 +83,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> logger:error("Failed to update ~ts; " "reason: {~0p, ~0p} stacktrace: ~0p", [GwName, Class, Reason, Stk]), - {error, {Class, Reason}} + {error, Reason} end. on_gateway_unload(_Gateway = #{ name := GwName, diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index d2fd86600..85aad78d2 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -97,7 +97,7 @@ gateway_insta(delete, #{bindings := #{name := Name0}}) -> ok -> {204}; {error, Reason} -> - return_http_error(400, Reason) + emqx_gateway_http:reason2resp(Reason) end end); gateway_insta(get, #{bindings := #{name := Name0}}) -> @@ -134,7 +134,7 @@ gateway_insta(put, #{body := GwConf0, {ok, Gateway} -> {200, Gateway}; {error, Reason} -> - return_http_error(500, Reason) + emqx_gateway_http:reason2resp(Reason) end end). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 228a51b28..b53ccd737 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -311,7 +311,12 @@ return_http_error(Code, Msg) -> -spec reason2msg({atom(), map()} | any()) -> error | string(). reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) -> - fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]); + NValue = case emqx_json:safe_encode(Value) of + {ok, Str} -> Str; + {error, _} -> emqx_gateway_utils:stringfy(Value) + end, + fmtstr("Bad config value '~s' for '~s', reason: ~s", + [NValue, Key, Reason]); reason2msg({badres, #{resource := gateway, gateway := GwName, reason := not_found}}) -> diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index fac55e24d..2524c1b5d 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -57,18 +57,19 @@ on_gateway_load(_Gateway = #{ name := GwName, config := Config }, Ctx) -> %% XXX: How to monitor it ? - %% Start grpc client pool & client channel - PoolName = pool_name(GwName), - PoolSize = emqx_vm:schedulers() * 2, - {ok, PoolSup} = emqx_pool_sup:start_link( - PoolName, hash, PoolSize, - {emqx_exproto_gcli, start_link, []}), _ = start_grpc_client_channel(GwName, maps:get(handler, Config, undefined) ), %% XXX: How to monitor it ? _ = start_grpc_server(GwName, maps:get(server, Config, undefined)), + %% XXX: How to monitor it ? + PoolName = pool_name(GwName), + PoolSize = emqx_vm:schedulers() * 2, + {ok, PoolSup} = emqx_pool_sup:start_link( + PoolName, hash, PoolSize, + {emqx_exproto_gcli, start_link, []}), + NConfig = maps:without( [server, handler], Config#{pool_name => PoolName} @@ -103,7 +104,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> logger:error("Failed to update ~ts; " "reason: {~0p, ~0p} stacktrace: ~0p", [GwName, Class, Reason, Stk]), - {error, {Class, Reason}} + {error, Reason} end. on_gateway_unload(_Gateway = #{ name := GwName, @@ -141,8 +142,11 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) -> console_print("Start ~ts gRPC server on ~p successfully.~n", [GwName, ListenOn]); {error, Reason} -> - ?ELOG("Failed to start ~ts gRPC server on ~p, reason: ~p", - [GwName, ListenOn, Reason]) + ?ELOG("Failed to start ~ts gRPC server on ~p, reason: ~0p", + [GwName, ListenOn, Reason]), + throw({badconf, #{key => server, + value => Options, + reason => illegal_grpc_server_confs}}) end. stop_grpc_server(GwName) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index 873fbe2c3..0cfb512b5 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -87,7 +87,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> logger:error("Failed to update ~ts; " "reason: {~0p, ~0p} stacktrace: ~0p", [GwName, Class, Reason, Stk]), - {error, {Class, Reason}} + {error, Reason} end. on_gateway_unload(_Gateway = #{ name := GwName, diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 3579bf4bd..a99f81f0a 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -106,7 +106,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> logger:error("Failed to update ~ts; " "reason: {~0p, ~0p} stacktrace: ~0p", [GwName, Class, Reason, Stk]), - {error, {Class, Reason}} + {error, Reason} end. on_gateway_unload(_Gateway = #{ name := GwName, diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 8e9ae6d1f..47d1899c9 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -87,7 +87,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> logger:error("Failed to update ~ts; " "reason: {~0p, ~0p} stacktrace: ~0p", [GwName, Class, Reason, Stk]), - {error, {Class, Reason}} + {error, Reason} end. on_gateway_unload(_Gateway = #{ name := GwName, From edb1460b56d152659abebf07692f33bac6fe3dc4 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 20 Mar 2022 10:18:50 +0800 Subject: [PATCH 2/7] chore(rule-engine): more safe generate even_msg content --- apps/emqx_rule_engine/src/emqx_rule_events.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 10e3e41ef..de3af52ac 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -191,12 +191,12 @@ eventmsg_connected(_ClientInfo = #{ clean_start := CleanStart, proto_name := ProtoName, proto_ver := ProtoVer, - keepalive := Keepalive, - connected_at := ConnectedAt, - conn_props := ConnProps, - receive_maximum := RcvMax, - expiry_interval := ExpiryInterval + connected_at := ConnectedAt }) -> + Keepalive = maps:get(keepalive, ConnInfo, 0), + ConnProps = maps:get(conn_props, ConnInfo, #{}), + RcvMax = maps:get(receive_maximum, ConnInfo, 0), + ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0), with_basic_columns('client.connected', #{clientid => ClientId, username => Username, From 5682dcb72ec1e5fd50db69059c958d14ff3b8d85 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 21 Mar 2022 12:01:22 +0800 Subject: [PATCH 3/7] fix(gw): enrich conninfo for coap&lwm2m --- .../src/coap/emqx_coap_channel.erl | 32 ++++++++++++++----- .../src/lwm2m/emqx_lwm2m_channel.erl | 28 +++++++++++++--- .../emqx_rule_engine/src/emqx_rule_events.erl | 2 +- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index cc5732ee2..7fa948a47 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -119,10 +119,10 @@ stats(_) -> []. -spec init(map(), map()) -> channel(). -init(ConnInfoT = #{peername := {PeerHost, _}, - sockname := {_, SockPort}}, +init(ConnInfo = #{peername := {PeerHost, _}, + sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> - Peercert = maps:get(peercert, ConnInfoT, undefined), + Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Config, <<>>), ListenerId = case maps:get(listener, Config, undefined) of undefined -> undefined; @@ -144,10 +144,6 @@ init(ConnInfoT = #{peername := {PeerHost, _}, } ), - %% because it is possible to disconnect after init, and then trigger the - %% $event.disconnected hook and these two fields are required in the hook - ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>}, - Heartbeat = ?GET_IDLE_TIME(Config), #channel{ ctx = Ctx , conninfo = ConnInfo @@ -405,6 +401,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx, {ok, Input, Channel} end. +enrich_conninfo({Queries, _Msg}, + Channel = #channel{ + keepalive = KeepAlive, + conninfo = ConnInfo}) -> + case Queries of + #{<<"clientid">> := ClientId} -> + Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), + NConnInfo = ConnInfo#{ clientid => ClientId + , proto_name => <<"CoAP">> + , proto_ver => <<"1">> + , clean_start => true + , keepalive => Interval + , expiry_interval => 0 + }, + {ok, Channel#channel{conninfo = NConnInfo}}; + _ -> + {error, "invalid queries", Channel} + end. + enrich_clientinfo({Queries, Msg}, Channel = #channel{clientinfo = ClientInfo0}) -> case Queries of @@ -583,7 +598,8 @@ process_nothing(_, _, Channel) -> process_connection({open, Req}, Result, Channel, Iter) -> Queries = emqx_coap_message:get_option(uri_query, Req), case emqx_misc:pipeline( - [ fun run_conn_hooks/2 + [ fun enrich_conninfo/2 + , fun run_conn_hooks/2 , fun enrich_clientinfo/2 , fun set_log_meta/2 , fun auth_connect/2 diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index c53ca0528..0ff3dad27 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -108,10 +108,10 @@ info(ctx, #channel{ctx = Ctx}) -> stats(_) -> []. -init(ConnInfoT = #{peername := {PeerHost, _}, - sockname := {_, SockPort}}, +init(ConnInfo = #{peername := {PeerHost, _}, + sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> - Peercert = maps:get(peercert, ConnInfoT, undefined), + Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Config, undefined), ListenerId = case maps:get(listener, Config, undefined) of undefined -> undefined; @@ -133,8 +133,6 @@ init(ConnInfoT = #{peername := {PeerHost, _}, } ), - ConnInfo = ConnInfoT#{proto_name => <<"LwM2M">>, proto_ver => <<"0.0">>}, - #channel{ ctx = Ctx , conninfo = ConnInfo , clientinfo = ClientInfo @@ -369,6 +367,7 @@ do_takeover(_DesireId, Msg, Channel) -> do_connect(Req, Result, Channel, Iter) -> case emqx_misc:pipeline( [ fun check_lwm2m_version/2 + , fun enrich_conninfo/2 , fun run_conn_hooks/2 , fun enrich_clientinfo/2 , fun set_log_meta/2 @@ -427,6 +426,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx, {ok, Input, Channel} end. +enrich_conninfo(#coap_message{options = Options}, + Channel = #channel{ + conninfo = ConnInfo}) -> + Query = maps:get(uri_query, Options, #{}), + case Query of + #{<<"ep">> := Epn, <<"lt">> := Lifetime} -> + ClientId = maps:get(<<"device_id">>, Query, Epn), + NConnInfo = ConnInfo#{ clientid => ClientId + , proto_name => <<"LwM2M">> + , proto_ver => <<"1.0.1">> + , clean_start => true + , keepalive => binary_to_integer(Lifetime) + , expiry_interval => 0 + }, + {ok, Channel#channel{conninfo = NConnInfo}}; + _ -> + {error, "invalid queries", Channel} + end. + enrich_clientinfo(#coap_message{options = Options} = Msg, Channel = #channel{clientinfo = ClientInfo0}) -> Query = maps:get(uri_query, Options, #{}), diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index de3af52ac..2cd43e691 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -185,7 +185,7 @@ eventmsg_connected(_ClientInfo = #{ is_bridge := IsBridge, mountpoint := Mountpoint }, - _ConnInfo = #{ + ConnInfo = #{ peername := PeerName, sockname := SockName, clean_start := CleanStart, From 053f9b422ccf8ea127c9a7625603a1b9703bb568 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 21 Mar 2022 14:46:47 +0800 Subject: [PATCH 4/7] fix(stomp): fix client.disconnect not trigger --- .../src/stomp/emqx_stomp_channel.erl | 23 ++++++++----------- .../src/stomp/emqx_stomp_frame.erl | 2 ++ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index e0ab47f73..1ac660d4b 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -512,8 +512,15 @@ handle_in(?PACKET(?CMD_ABORT, Headers), handle_out(receipt, receipt_id(Headers), NChannel) end); -handle_in(?PACKET(?CMD_DISCONNECT, Headers), Channel) -> - shutdown_with_recepit(normal, receipt_id(Headers), Channel); +handle_in(?PACKET(?CMD_DISCONNECT, Headers), + Channel = #channel{conn_state = connected}) -> + Outgoings = case receipt_id(Headers) of + undefined -> [{close, normal}]; + ReceiptId -> + [{outgoing, receipt_frame(ReceiptId)}, + {close, normal}] + end, + {ok, Outgoings, Channel}; handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> ?SLOG(error, #{ msg => "unexpected_frame_error" @@ -765,7 +772,6 @@ handle_info({sock_closed, Reason}, %emqx_zone:enable_flapping_detect(Zone) % andalso emqx_flapping:detect(ClientInfo), NChannel = ensure_disconnected(Reason, Channel), - %% XXX: Session keepper detect here shutdown(Reason, NChannel); handle_info({sock_closed, Reason}, @@ -918,20 +924,9 @@ reply(Reply, Channel) -> shutdown(Reason, Channel) -> {shutdown, Reason, Channel}. -shutdown_with_recepit(Reason, ReceiptId, Channel) -> - case ReceiptId of - undefined -> - {shutdown, Reason, Channel}; - _ -> - {shutdown, Reason, receipt_frame(ReceiptId), Channel} - end. - shutdown(Reason, AckFrame, Channel) -> {shutdown, Reason, AckFrame, Channel}. -%shutdown_and_reply(Reason, Reply, Channel) -> -% {shutdown, Reason, Reply, Channel}. - shutdown_and_reply(Reason, Reply, OutPkt, Channel) -> {shutdown, Reason, Reply, OutPkt, Channel}. diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index 314b2d884..7158cbcdb 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -158,6 +158,8 @@ parse(<>, Phase =:= hdvalue -> parse(Phase, Rest, acc(unescape(Ch), State)); +parse(<>, Parser = #{phase := none}) -> + {more, Parser}; parse(Bytes, #{phase := none, state := State}) -> parse(command, Bytes, State). From 670749493bf63be0f813fcd3011d2aaa9705ac11 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 21 Mar 2022 16:22:47 +0800 Subject: [PATCH 5/7] fix(coap): reject duplicated connection request --- .../src/coap/emqx_coap_channel.erl | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index 7fa948a47..d15b8605a 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -595,7 +595,8 @@ process_out(Outs, Result, Channel, _) -> process_nothing(_, _, Channel) -> {ok, Channel}. -process_connection({open, Req}, Result, Channel, Iter) -> +process_connection({open, Req}, Result, + Channel = #channel{conn_state = idle}, Iter) -> Queries = emqx_coap_message:get_option(uri_query, Req), case emqx_misc:pipeline( [ fun enrich_conninfo/2 @@ -610,12 +611,31 @@ process_connection({open, Req}, Result, Channel, Iter) -> process_connect(ensure_connected(NChannel), Req, Result, Iter); {error, ReasonCode, NChannel} -> ErrMsg = io_lib:format("Login Failed: ~ts", [ReasonCode]), - Payload = erlang:list_to_binary(lists:flatten(ErrMsg)), + Payload = iolist_to_binary(ErrMsg), iter(Iter, reply({error, bad_request}, Payload, Req, Result), NChannel) end; - +process_connection({open, Req}, Result, + Channel = #channel{ + conn_state = ConnState, + clientinfo = #{clientid := ClientId}}, Iter) + when ConnState == connected -> + Queries = emqx_coap_message:get_option(uri_query, Req), + ErrMsg0 = + case Queries of + #{<<"clientid">> := ClientId} -> + "client has connected"; + #{<<"clientid">> := ReqClientId} -> + ["channel has registered by: ", ReqClientId]; + _ -> + "invalid queries" + end, + ErrMsg = io_lib:format("Bad Request: ~ts", [ErrMsg0]), + Payload = iolist_to_binary(ErrMsg), + iter(Iter, + reply({error, bad_request}, Payload, Req, Result), + Channel); process_connection({close, Msg}, _, Channel, _) -> Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), {shutdown, close, Reply, Channel}. From 094c4ad2625a3d8496bf848a7a91e02546044837 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 21 Mar 2022 18:44:01 +0800 Subject: [PATCH 6/7] fix(exproto): don't carry the ssl confs if ssl.enable is false --- apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 2524c1b5d..5c9967182 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -166,14 +166,16 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) -> }}) end, - case maps:to_list(maps:get(ssl, Options, #{})) of - [] -> + case emqx_map_lib:deep_get([ssl, enable], Options, false) of + false -> SvrAddr = compose_http_uri(http, Host, Port), grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{}); - SslOpts -> + true -> + SslOpts = maps:to_list(maps:get(ssl, Options, #{})), ClientOpts = #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}, + SvrAddr = compose_http_uri(https, Host, Port), grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts) end. From 6da4592bb0bfeefda5cb57b68a6eba2ff55103ba Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 21 Mar 2022 18:52:41 +0800 Subject: [PATCH 7/7] chore: fix elivs warnings --- apps/emqx_rule_engine/src/emqx_rule_events.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 2cd43e691..89731e0bd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -55,6 +55,8 @@ ]). -endif. +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + event_names() -> [ 'client.connected' , 'client.disconnected' @@ -407,7 +409,8 @@ event_info_message_dropped() -> event_info_common( 'message.dropped', {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>}, - {<<"messages are discarded during routing, usually because there are no subscribers">>, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, + {<<"messages are discarded during routing, usually because there are no subscribers">>, + <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). event_info_delivery_dropped() -> @@ -415,7 +418,7 @@ event_info_delivery_dropped() -> 'delivery.dropped', {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>}, {<<"messages are discarded during delivery, i.e. because the message queue is full">>, - <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>}, + <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>}, <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">> ). event_info_client_connected() ->