From 8cfd080a97d848b8d808aef435d6f87dbd045f42 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Sun, 7 Apr 2019 22:31:42 +0800 Subject: [PATCH 1/9] Fix gen_statem:reply/2 bug in emqx_client module Prior to this change, the arguments passed to gen_statem:reply is bad args, first arg should be a tuple, but it is a pid. So it would trigger crash. This change fix this bug. --- src/emqx_client.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 75b384fcb..371b020f1 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -1049,7 +1049,7 @@ timeout_calls(Timeout, Calls) -> timeout_calls(Now, Timeout, Calls) -> lists:foldl(fun(C = #call{from = From, ts = Ts}, Acc) -> case (timer:now_diff(Now, Ts) div 1000) >= Timeout of - true -> gen_statem:reply(From, {error, ack_timeout}), + true -> From ! {error, ack_timeout}, Acc; false -> [C | Acc] end @@ -1231,4 +1231,3 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) -> -spec next_packet_id(packet_id()) -> packet_id(). next_packet_id(?MAX_PACKET_ID) -> 1; next_packet_id(Id) -> Id + 1. - From 627ea0afe8cfc8e144572ac1516f7984a0b5c55e Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Tue, 9 Apr 2019 11:32:37 +0800 Subject: [PATCH 2/9] Add `is_bridge_exist` api in emqx_bridge_sup --- src/emqx_bridge_sup.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index fc0be3995..b00bb9012 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -20,11 +20,12 @@ %% APIs -export([ start_link/0 , start_link/1 - , bridges/0 ]). -export([ create_bridge/2 , drop_bridge/1 + , bridges/0 + , is_bridge_exist/1 ]). %% supervisor callbacks @@ -58,6 +59,13 @@ bridge_spec({Name, Config}) -> bridges() -> [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. +-spec(is_bridge_exist(atom() | pid()) -> boolean()). +is_bridge_exist(Id) -> + case supervisor:get_childspec(?SUP, Id) of + {ok, _ChildSpec} -> true; + {error, _Error} -> false + end. + create_bridge(Id, Config) -> supervisor:start_child(?SUP, bridge_spec({Id, Config})). @@ -69,4 +77,3 @@ drop_bridge(Id) -> ?LOG(error, "[Bridge] Delete bridge failed", [Error]), Error end. - From eb7b1797c258beeef5be99a7586fcb9bf6d84884 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Tue, 9 Apr 2019 18:38:10 +0800 Subject: [PATCH 3/9] Provide bridge handler to extense emqx_bridge --- src/emqx_bridge.erl | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a15851f0f..6af56d11b 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -119,6 +119,7 @@ -define(DEFAULT_SEND_AHEAD, 8). -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). +-define(NO_BRIDGE_HANDLER, undefined). -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a bridge worker. Supported configs: @@ -277,7 +278,8 @@ init(Config) -> subscriptions => Subs, replayq => Queue, inflight => [], - connection => undefined + connection => undefined, + bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER) }}. code_change(_Vsn, State, Data, _Extra) -> @@ -321,7 +323,10 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, {ok, ConnRef, Conn} -> ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), Action = {state_timeout, 0, connected}, - {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; + {keep_state, + eval_bridge_handler(State#{ conn_ref => ConnRef + , connection => Conn}, connected), + Action}; error -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action} @@ -416,6 +421,12 @@ common(StateName, Type, Content, State) -> [name(), Type, StateName, Content]), {keep_state, State}. +eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> + State; +eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> + _ = Handler(Msg), + State. + ensure_present(Key, Topic, State) -> Topics = maps:get(Key, State), case is_topic_present(Topic, Topics) of @@ -553,9 +564,11 @@ disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> ok = Module:stop(ConnRef, Conn), - State#{conn_ref => undefined, - connection => undefined}; -disconnect(State) -> State. + eval_bridge_handler(State#{conn_ref => undefined, + connection => undefined}, + disconnected); +disconnect(State) -> + eval_bridge_handler(State, disconnected). %% Called only when replayq needs to dump it to disk. msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin); From a53320069be69ad1f80ebc315caf520ae33f6f94 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 10 Apr 2019 09:14:53 +0800 Subject: [PATCH 4/9] Fix trace logger level not work (#2408) * Fix trace logger level not work #2385 --- src/emqx_tracer.erl | 29 +++++++++++++++++++++++++---- test/emqx_tracer_SUITE.erl | 4 ++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 464a28ad2..c85aa5007 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -76,11 +76,21 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload}) %% @doc Start to trace client_id or topic. -spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}). start_trace({client_id, ClientId}, Level, LogFile) -> - start_trace({start_trace, {client_id, ClientId}, Level, LogFile}); + do_start_trace({client_id, ClientId}, Level, LogFile); start_trace({topic, Topic}, Level, LogFile) -> - start_trace({start_trace, {topic, Topic}, Level, LogFile}). + do_start_trace({topic, Topic}, Level, LogFile). -start_trace(Req) -> gen_server:call(?MODULE, Req, infinity). +do_start_trace(Who, Level, LogFile) -> + #{level := PrimaryLevel} = logger:get_primary_config(), + try logger:compare_levels(log_level(Level), PrimaryLevel) of + lt -> + {error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])}; + _GtOrEq -> + gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000) + catch + _:Error -> + {error, Error} + end. %% @doc Stop tracing client_id or topic. -spec(stop_trace(trace_who()) -> ok | {error, term()}). @@ -109,7 +119,7 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T config => #{type => halt, file => LogFile}, filter_default => stop, filters => [{meta_key_filter, - {fun filter_by_meta_key/2, Who} }]}) of + {fun filter_by_meta_key/2, Who} }]}) of ok -> ?LOG(info, "[Tracer] Start trace for ~p", [Who]), {reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}}; @@ -168,3 +178,14 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) -> end; _ -> ignore end. + +log_level(emergency) -> emergency; +log_level(alert) -> alert; +log_level(critical) -> critical; +log_level(error) -> error; +log_level(warning) -> warning; +log_level(notice) -> notice; +log_level(info) -> info; +log_level(debug) -> debug; +log_level(all) -> debug; +log_level(_) -> throw(invalid_log_level). diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index 323700823..327930fc3 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -38,8 +38,12 @@ start_traces(_Config) -> emqx_client:connect(T), %% Start tracing + emqx_logger:set_log_level(error), + {error, _} = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"), + emqx_logger:set_log_level(debug), ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"), ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"), + {error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"), ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"), ct:sleep(100), From 6a4c318acbc27d6e8379ceb9f50e26d26f2973ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 10 Apr 2019 14:22:50 +0800 Subject: [PATCH 5/9] Update metrics when session process terminates --- src/emqx_session.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2ef64c9aa..f1aa6d3bd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -676,6 +676,7 @@ terminate(Reason, #state{will_msg = WillMsg, username = Username, conn_pid = ConnPid, old_conn_pid = OldConnPid}) -> + emqx_metrics:commit(), send_willmsg(WillMsg), [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). From 4fc81cef855d09829ba655f343903ad7bc57f264 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Thu, 11 Apr 2019 12:20:32 +0800 Subject: [PATCH 6/9] Change log level unexpected info in client (#2422) Change log level for unexpected info in client and rejust code format in emqx_bridge --- src/emqx_bridge.erl | 14 ++++++-------- src/emqx_client.erl | 5 +++++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 6af56d11b..03e7a478e 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -323,10 +323,9 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, {ok, ConnRef, Conn} -> ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), Action = {state_timeout, 0, connected}, - {keep_state, - eval_bridge_handler(State#{ conn_ref => ConnRef - , connection => Conn}, connected), - Action}; + State0 = State#{conn_ref => ConnRef, connection => Conn}, + State1 = eval_bridge_handler(State0, connected), + {keep_state, State1, Action}; error -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action} @@ -424,7 +423,7 @@ common(StateName, Type, Content, State) -> eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> State; eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> - _ = Handler(Msg), + Handler(Msg), State. ensure_present(Key, Topic, State) -> @@ -564,9 +563,8 @@ disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> ok = Module:stop(ConnRef, Conn), - eval_bridge_handler(State#{conn_ref => undefined, - connection => undefined}, - disconnected); + State0 = State#{conn_ref => undefined, connection => undefined}, + eval_bridge_handler(State0, disconnected); disconnect(State) -> eval_bridge_handler(State, disconnected). diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 371b020f1..737c699ac 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -928,6 +928,11 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> ?LOG(error, "[Client] Got tcp error: ~p", [Reason]), {stop, {shutdown, Reason}, State}; +handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> + ?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)", + [StateName, EventContent]), + keep_state_and_data; + handle_event(EventType, EventContent, StateName, _StateData) -> ?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)", [StateName, EventType, EventContent]), From 4d2bc48822bba8d45f4aa256141db63312ba431d Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 11 Apr 2019 16:40:02 +0800 Subject: [PATCH 7/9] Redesign ensure_start and ensure_stop api of bridge --- src/emqx_bridge.erl | 52 +++++++++++++++++++++++-------------- src/emqx_bridge_connect.erl | 3 +-- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 03e7a478e..3261a02bc 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -120,6 +120,7 @@ -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). -define(NO_BRIDGE_HANDLER, undefined). +-define(NO_FROM, undefined). -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a bridge worker. Supported configs: @@ -297,8 +298,7 @@ standing_by(enter, _, #{start_type := auto}) -> standing_by(enter, _, #{start_type := manual}) -> keep_state_and_data; standing_by({call, From}, ensure_started, State) -> - {next_state, connecting, State, - [{reply, From, ok}]}; + do_connect({call, From}, standing_by, State); standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by(info, Info, State) -> @@ -313,23 +313,8 @@ standing_by(Type, Content, State) -> connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action}; -connecting(enter, _, #{reconnect_delay_ms := Timeout, - connect_fun := ConnectFun, - subscriptions := Subs, - forwards := Forwards - } = State) -> - ok = subscribe_local_topics(Forwards), - case ConnectFun(Subs) of - {ok, ConnRef, Conn} -> - ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), - Action = {state_timeout, 0, connected}, - State0 = State#{conn_ref => ConnRef, connection => Conn}, - State1 = eval_bridge_handler(State0, connected), - {keep_state, State1, Action}; - error -> - Action = {state_timeout, Timeout, reconnect}, - {keep_state_and_data, Action} - end; +connecting(enter, _, State) -> + do_connect(enter, connecting, State); connecting(state_timeout, connected, State) -> {next_state, connected, State}; connecting(state_timeout, reconnect, _State) -> @@ -455,6 +440,35 @@ is_topic_present({Topic, _QoS}, Topics) -> is_topic_present(Topic, Topics) -> lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). +do_connect(Type, StateName, #{ forwards := Forwards + , subscriptions := Subs + , connect_fun := ConnectFun + , reconnect_delay_ms := Timeout + } = State) -> + ok = subscribe_local_topics(Forwards), + From = case StateName of + standing_by -> {call, Pid} = Type, Pid; + connecting -> ?NO_FROM + end, + DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) -> + StandingbyAction; + (connecting, _StandingbyAction, ConnectingAction) -> + ConnectingAction + end, + case ConnectFun(Subs) of + {ok, ConnRef, Conn} -> + ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), + State0 = State#{conn_ref => ConnRef, connection => Conn}, + State1 = eval_bridge_handler(State0, connected), + StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]}, + ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}}, + DoEvent(StateName, StandingbyAction, ConnectingAction); + {error, Reason} -> + StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]}, + ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}}, + DoEvent(StateName, StandingbyAction, ConnectingAction) + end. + do_ensure_present(forwards, Topic, _) -> ok = subscribe_local_topic(Topic); do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule, diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index 37231ca88..8685451ae 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -56,7 +56,7 @@ start(Module, Config) -> Config1 = obfuscate(Config), ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config1, Reason]), - error + {error, Reason} end. obfuscate(Map) -> @@ -69,4 +69,3 @@ obfuscate(Map) -> is_sensitive(password) -> true; is_sensitive(_) -> false. - From 81ef5b9b8d9bec72233bd8c0c83acc86e25a06d6 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Wed, 10 Apr 2019 18:50:28 +0800 Subject: [PATCH 8/9] Support cookie based auth --- src/emqx_protocol.erl | 14 ++++++++++---- src/emqx_ws_connection.erl | 12 ++++++++++++ test/rfc6455_client.erl | 2 +- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index a8bc0b288..06b0e7886 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -68,7 +68,8 @@ ignore_loop, topic_alias_maximum, conn_mod, - credentials + credentials, + ws_cookie }). -opaque(state() :: #pstate{}). @@ -85,7 +86,9 @@ %%------------------------------------------------------------------------------ -spec(init(map(), list()) -> state()). -init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> +init(SocketOpts = #{ peername := Peername + , peercert := Peercert + , sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, @@ -110,7 +113,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), topic_alias_maximum = #{to_client => 0, from_client => 0}, conn_mod = maps:get(conn_mod, SocketOpts, undefined), - credentials = #{}}. + credentials = #{}, + ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -202,11 +206,13 @@ credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, peername = Peername, - peercert = Peercert}) -> + peercert = Peercert, + ws_cookie = WsCookie}) -> with_cert(#{zone => Zone, client_id => ClientId, username => Username, peername => Peername, + ws_cookie => WsCookie, mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). with_cert(Credentials, undefined) -> Credentials; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 92ae6a7dd..1d6f3e244 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -138,10 +138,22 @@ websocket_init(#state{request = Req, options = Options}) -> Peername = cowboy_req:peer(Req), Sockname = cowboy_req:sock(Req), Peercert = cowboy_req:cert(Req), + WsCookie = try cowboy_req:parse_cookies(Req) + catch + error:badarg -> + ?LOG(error, "[WS Connection] Illegal cookie"), + undefined; + Error:Reason -> + ?LOG(error, + "[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p", + [Error, Reason]), + undefined + end, ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, sendfun => send_fun(self()), + ws_cookie => WsCookie, conn_mod => ?MODULE}, Options), ParserState = emqx_protocol:parser(ProtoState), Zone = proplists:get_value(zone, Options), diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index f5d8f1ef4..987b72407 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -36,7 +36,7 @@ new(WsUrl, PPid) -> addr = Addr, path = "/" ++ Path, ppid = PPid}, - spawn(fun () -> + spawn(fun() -> start_conn(State) end). From f1616c33d97b43133072490c62db68ce0bbe3e98 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Sat, 13 Apr 2019 10:35:59 +0800 Subject: [PATCH 9/9] Convert value of attribute table to map --- src/emqx_bridge_sup.erl | 2 +- src/emqx_client.erl | 2 +- src/emqx_connection.erl | 22 ++++++++--------- src/emqx_mod_presence.erl | 16 ++++++------ src/emqx_protocol.erl | 41 ++++++++++++++++--------------- src/emqx_ws_connection.erl | 16 ++++++------ test/emqx_connection_SUITE.erl | 16 ++++++------ test/emqx_ws_connection_SUITE.erl | 18 +++++++------- 8 files changed, 68 insertions(+), 65 deletions(-) diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index b00bb9012..a40e7b2e3 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -74,6 +74,6 @@ drop_bridge(Id) -> ok -> supervisor:delete_child(?SUP, Id); Error -> - ?LOG(error, "[Bridge] Delete bridge failed", [Error]), + ?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]), Error end. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 737c699ac..1c9b26a0f 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -929,7 +929,7 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> {stop, {shutdown, Reason}, State}; handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> - ?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)", + ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)", [StateName, EventContent]), keep_state_and_data; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e9cfd6ae4..5beee28bb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -87,15 +87,15 @@ info(#state{transport = Transport, rate_limit = RateLimit, pub_limit = PubLimit, proto_state = ProtoState}) -> - ConnInfo = [{socktype, Transport:type(Socket)}, - {peername, Peername}, - {sockname, Sockname}, - {conn_state, ConnState}, - {active_n, ActiveN}, - {rate_limit, rate_limit_info(RateLimit)}, - {pub_limit, rate_limit_info(PubLimit)}], + ConnInfo = #{socktype => Transport:type(Socket), + peername => Peername, + sockname => Sockname, + conn_state => ConnState, + active_n => ActiveN, + rate_limit => rate_limit_info(RateLimit), + pub_limit => rate_limit_info(PubLimit)}, ProtoInfo = emqx_protocol:info(ProtoState), - lists:usort(lists:append(ConnInfo, ProtoInfo)). + maps:merge(ConnInfo, ProtoInfo). rate_limit_info(undefined) -> #{}; @@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). %% Conn stats stats(CPid) when is_pid(CPid) -> diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index d2ce98abc..9789474d7 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -42,12 +42,15 @@ load(Env) -> on_client_connected(#{client_id := ClientId, username := Username, peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> - Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs), - case emqx_json:safe_encode([{clientid, ClientId}, - {username, Username}, - {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - {connack, ConnAck}, - {ts, os:system_time(second)} | Attrs]) of + Attrs = maps:filter(fun(K, _) -> + lists:member(K, ?ATTR_KEYS) + end, ConnAttrs), + case emqx_json:safe_encode(Attrs#{clientid => ClientId, + username => Username, + ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), + connack => ConnAck, + ts => os:system_time(second) + }) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> @@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 06b0e7886..54baac636 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -139,12 +139,13 @@ info(PState = #pstate{conn_props = ConnProps, topic_aliases = Aliases, will_msg = WillMsg, enable_acl = EnableAcl}) -> - attrs(PState) ++ [{conn_props, ConnProps}, - {ack_props, AckProps}, - {session, Session}, - {topic_aliases, Aliases}, - {will_msg, WillMsg}, - {enable_acl, EnableAcl}]. + maps:merge(attrs(PState), #{conn_props => ConnProps, + ack_props => AckProps, + session => Session, + topic_aliases => Aliases, + will_msg => WillMsg, + enable_acl => EnableAcl + }). attrs(#pstate{zone = Zone, client_id = ClientId, @@ -159,20 +160,20 @@ attrs(#pstate{zone = Zone, connected_at = ConnectedAt, conn_mod = ConnMod, credentials = Credentials}) -> - [{zone, Zone}, - {client_id, ClientId}, - {username, Username}, - {peername, Peername}, - {peercert, Peercert}, - {proto_ver, ProtoVer}, - {proto_name, ProtoName}, - {clean_start, CleanStart}, - {keepalive, Keepalive}, - {is_bridge, IsBridge}, - {connected_at, ConnectedAt}, - {conn_mod, ConnMod}, - {credentials, Credentials} - ]. + #{ zone => Zone + , client_id => ClientId + , username => Username + , peername => Peername + , peercert => Peercert + , proto_ver => ProtoVer + , proto_name => ProtoName + , clean_start => CleanStart + , keepalive => Keepalive + , is_bridge => IsBridge + , connected_at => ConnectedAt + , conn_mod => ConnMod + , credentials => Credentials + }. attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 1d6f3e244..44e1eb681 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -61,11 +61,11 @@ info(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), - ConnInfo = [{socktype, websocket}, - {conn_state, running}, - {peername, Peername}, - {sockname, Sockname}], - lists:append([ConnInfo, ProtoInfo]). + ConnInfo = #{socktype => websocket, + conn_state => running, + peername => Peername, + sockname => Sockname}, + maps:merge(ProtoInfo, ConnInfo). %% for dashboard attrs(WSPid) when is_pid(WSPid) -> @@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). stats(WSPid) when is_pid(WSPid) -> call(WSPid, stats); diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 9c9c3ab55..bd2293c97 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -51,16 +51,16 @@ t_connect_api(_Config) -> emqx_client:disconnect(T1). t_info(ConnInfo) -> - ?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)), - ?assertEqual(running, proplists:get_value(conn_state, ConnInfo)), - ?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)). + ?assertEqual(tcp, maps:get(socktype, ConnInfo)), + ?assertEqual(running, maps:get(conn_state, ConnInfo)), + ?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)), + ?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)). t_attrs(AttrsData) -> - ?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index c45344bae..289608428 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -73,16 +73,16 @@ raw_recv_pase(P) -> version => ?MQTT_PROTO_V4} }). t_info(InfoData) -> - ?assertEqual(websocket, proplists:get_value(socktype, InfoData)), - ?assertEqual(running, proplists:get_value(conn_state, InfoData)), - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)). + ?assertEqual(websocket, maps:get(socktype, InfoData)), + ?assertEqual(running, maps:get(conn_state, InfoData)), + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)), + ?assertEqual(<<"admin">>, maps:get(username, InfoData)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)). t_attrs(AttrsData) -> - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"admin">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), @@ -91,4 +91,4 @@ t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(reductions, StatsData) >=0), ?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1), ?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0), - ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). \ No newline at end of file + ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1).