Merge pull request #2429 from emqx/develop
Auto-pull-request-by-2019-04-14
This commit is contained in:
commit
1f4d8483bf
|
@ -119,6 +119,8 @@
|
|||
-define(DEFAULT_SEND_AHEAD, 8).
|
||||
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
|
||||
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
|
||||
-define(NO_BRIDGE_HANDLER, undefined).
|
||||
-define(NO_FROM, undefined).
|
||||
-define(maybe_send, {next_event, internal, maybe_send}).
|
||||
|
||||
%% @doc Start a bridge worker. Supported configs:
|
||||
|
@ -277,7 +279,8 @@ init(Config) ->
|
|||
subscriptions => Subs,
|
||||
replayq => Queue,
|
||||
inflight => [],
|
||||
connection => undefined
|
||||
connection => undefined,
|
||||
bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER)
|
||||
}}.
|
||||
|
||||
code_change(_Vsn, State, Data, _Extra) ->
|
||||
|
@ -295,8 +298,7 @@ standing_by(enter, _, #{start_type := auto}) ->
|
|||
standing_by(enter, _, #{start_type := manual}) ->
|
||||
keep_state_and_data;
|
||||
standing_by({call, From}, ensure_started, State) ->
|
||||
{next_state, connecting, State,
|
||||
[{reply, From, ok}]};
|
||||
do_connect({call, From}, standing_by, State);
|
||||
standing_by(state_timeout, do_connect, State) ->
|
||||
{next_state, connecting, State};
|
||||
standing_by(info, Info, State) ->
|
||||
|
@ -311,21 +313,8 @@ standing_by(Type, Content, State) ->
|
|||
connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
|
||||
Action = {state_timeout, Timeout, reconnect},
|
||||
{keep_state_and_data, Action};
|
||||
connecting(enter, _, #{reconnect_delay_ms := Timeout,
|
||||
connect_fun := ConnectFun,
|
||||
subscriptions := Subs,
|
||||
forwards := Forwards
|
||||
} = State) ->
|
||||
ok = subscribe_local_topics(Forwards),
|
||||
case ConnectFun(Subs) of
|
||||
{ok, ConnRef, Conn} ->
|
||||
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
|
||||
Action = {state_timeout, 0, connected},
|
||||
{keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
|
||||
error ->
|
||||
Action = {state_timeout, Timeout, reconnect},
|
||||
{keep_state_and_data, Action}
|
||||
end;
|
||||
connecting(enter, _, State) ->
|
||||
do_connect(enter, connecting, State);
|
||||
connecting(state_timeout, connected, State) ->
|
||||
{next_state, connected, State};
|
||||
connecting(state_timeout, reconnect, _State) ->
|
||||
|
@ -416,6 +405,12 @@ common(StateName, Type, Content, State) ->
|
|||
[name(), Type, StateName, Content]),
|
||||
{keep_state, State}.
|
||||
|
||||
eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
|
||||
State;
|
||||
eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
|
||||
Handler(Msg),
|
||||
State.
|
||||
|
||||
ensure_present(Key, Topic, State) ->
|
||||
Topics = maps:get(Key, State),
|
||||
case is_topic_present(Topic, Topics) of
|
||||
|
@ -445,6 +440,35 @@ is_topic_present({Topic, _QoS}, Topics) ->
|
|||
is_topic_present(Topic, Topics) ->
|
||||
lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
|
||||
|
||||
do_connect(Type, StateName, #{ forwards := Forwards
|
||||
, subscriptions := Subs
|
||||
, connect_fun := ConnectFun
|
||||
, reconnect_delay_ms := Timeout
|
||||
} = State) ->
|
||||
ok = subscribe_local_topics(Forwards),
|
||||
From = case StateName of
|
||||
standing_by -> {call, Pid} = Type, Pid;
|
||||
connecting -> ?NO_FROM
|
||||
end,
|
||||
DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) ->
|
||||
StandingbyAction;
|
||||
(connecting, _StandingbyAction, ConnectingAction) ->
|
||||
ConnectingAction
|
||||
end,
|
||||
case ConnectFun(Subs) of
|
||||
{ok, ConnRef, Conn} ->
|
||||
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
|
||||
State0 = State#{conn_ref => ConnRef, connection => Conn},
|
||||
State1 = eval_bridge_handler(State0, connected),
|
||||
StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]},
|
||||
ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}},
|
||||
DoEvent(StateName, StandingbyAction, ConnectingAction);
|
||||
{error, Reason} ->
|
||||
StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]},
|
||||
ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}},
|
||||
DoEvent(StateName, StandingbyAction, ConnectingAction)
|
||||
end.
|
||||
|
||||
do_ensure_present(forwards, Topic, _) ->
|
||||
ok = subscribe_local_topic(Topic);
|
||||
do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,
|
||||
|
@ -553,9 +577,10 @@ disconnect(#{connection := Conn,
|
|||
connect_module := Module
|
||||
} = State) when Conn =/= undefined ->
|
||||
ok = Module:stop(ConnRef, Conn),
|
||||
State#{conn_ref => undefined,
|
||||
connection => undefined};
|
||||
disconnect(State) -> State.
|
||||
State0 = State#{conn_ref => undefined, connection => undefined},
|
||||
eval_bridge_handler(State0, disconnected);
|
||||
disconnect(State) ->
|
||||
eval_bridge_handler(State, disconnected).
|
||||
|
||||
%% Called only when replayq needs to dump it to disk.
|
||||
msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin);
|
||||
|
|
|
@ -56,7 +56,7 @@ start(Module, Config) ->
|
|||
Config1 = obfuscate(Config),
|
||||
?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
|
||||
"config=~p\nreason:~p", [Module, Config1, Reason]),
|
||||
error
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
obfuscate(Map) ->
|
||||
|
@ -69,4 +69,3 @@ obfuscate(Map) ->
|
|||
|
||||
is_sensitive(password) -> true;
|
||||
is_sensitive(_) -> false.
|
||||
|
||||
|
|
|
@ -20,11 +20,12 @@
|
|||
%% APIs
|
||||
-export([ start_link/0
|
||||
, start_link/1
|
||||
, bridges/0
|
||||
]).
|
||||
|
||||
-export([ create_bridge/2
|
||||
, drop_bridge/1
|
||||
, bridges/0
|
||||
, is_bridge_exist/1
|
||||
]).
|
||||
|
||||
%% supervisor callbacks
|
||||
|
@ -58,6 +59,13 @@ bridge_spec({Name, Config}) ->
|
|||
bridges() ->
|
||||
[{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
|
||||
|
||||
-spec(is_bridge_exist(atom() | pid()) -> boolean()).
|
||||
is_bridge_exist(Id) ->
|
||||
case supervisor:get_childspec(?SUP, Id) of
|
||||
{ok, _ChildSpec} -> true;
|
||||
{error, _Error} -> false
|
||||
end.
|
||||
|
||||
create_bridge(Id, Config) ->
|
||||
supervisor:start_child(?SUP, bridge_spec({Id, Config})).
|
||||
|
||||
|
@ -66,7 +74,6 @@ drop_bridge(Id) ->
|
|||
ok ->
|
||||
supervisor:delete_child(?SUP, Id);
|
||||
Error ->
|
||||
?LOG(error, "[Bridge] Delete bridge failed", [Error]),
|
||||
?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]),
|
||||
Error
|
||||
end.
|
||||
|
||||
|
|
|
@ -928,6 +928,11 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
|
|||
?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
|
||||
{stop, {shutdown, Reason}, State};
|
||||
|
||||
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
|
||||
?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)",
|
||||
[StateName, EventContent]),
|
||||
keep_state_and_data;
|
||||
|
||||
handle_event(EventType, EventContent, StateName, _StateData) ->
|
||||
?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
|
||||
[StateName, EventType, EventContent]),
|
||||
|
@ -1049,7 +1054,7 @@ timeout_calls(Timeout, Calls) ->
|
|||
timeout_calls(Now, Timeout, Calls) ->
|
||||
lists:foldl(fun(C = #call{from = From, ts = Ts}, Acc) ->
|
||||
case (timer:now_diff(Now, Ts) div 1000) >= Timeout of
|
||||
true -> gen_statem:reply(From, {error, ack_timeout}),
|
||||
true -> From ! {error, ack_timeout},
|
||||
Acc;
|
||||
false -> [C | Acc]
|
||||
end
|
||||
|
@ -1231,4 +1236,3 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) ->
|
|||
-spec next_packet_id(packet_id()) -> packet_id().
|
||||
next_packet_id(?MAX_PACKET_ID) -> 1;
|
||||
next_packet_id(Id) -> Id + 1.
|
||||
|
||||
|
|
|
@ -87,15 +87,15 @@ info(#state{transport = Transport,
|
|||
rate_limit = RateLimit,
|
||||
pub_limit = PubLimit,
|
||||
proto_state = ProtoState}) ->
|
||||
ConnInfo = [{socktype, Transport:type(Socket)},
|
||||
{peername, Peername},
|
||||
{sockname, Sockname},
|
||||
{conn_state, ConnState},
|
||||
{active_n, ActiveN},
|
||||
{rate_limit, rate_limit_info(RateLimit)},
|
||||
{pub_limit, rate_limit_info(PubLimit)}],
|
||||
ConnInfo = #{socktype => Transport:type(Socket),
|
||||
peername => Peername,
|
||||
sockname => Sockname,
|
||||
conn_state => ConnState,
|
||||
active_n => ActiveN,
|
||||
rate_limit => rate_limit_info(RateLimit),
|
||||
pub_limit => rate_limit_info(PubLimit)},
|
||||
ProtoInfo = emqx_protocol:info(ProtoState),
|
||||
lists:usort(lists:append(ConnInfo, ProtoInfo)).
|
||||
maps:merge(ConnInfo, ProtoInfo).
|
||||
|
||||
rate_limit_info(undefined) ->
|
||||
#{};
|
||||
|
@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) ->
|
|||
attrs(#state{peername = Peername,
|
||||
sockname = Sockname,
|
||||
proto_state = ProtoState}) ->
|
||||
SockAttrs = [{peername, Peername},
|
||||
{sockname, Sockname}],
|
||||
SockAttrs = #{peername => Peername,
|
||||
sockname => Sockname},
|
||||
ProtoAttrs = emqx_protocol:attrs(ProtoState),
|
||||
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
|
||||
maps:merge(SockAttrs, ProtoAttrs).
|
||||
|
||||
%% Conn stats
|
||||
stats(CPid) when is_pid(CPid) ->
|
||||
|
|
|
@ -42,12 +42,15 @@ load(Env) ->
|
|||
on_client_connected(#{client_id := ClientId,
|
||||
username := Username,
|
||||
peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) ->
|
||||
Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs),
|
||||
case emqx_json:safe_encode([{clientid, ClientId},
|
||||
{username, Username},
|
||||
{ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))},
|
||||
{connack, ConnAck},
|
||||
{ts, os:system_time(second)} | Attrs]) of
|
||||
Attrs = maps:filter(fun(K, _) ->
|
||||
lists:member(K, ?ATTR_KEYS)
|
||||
end, ConnAttrs),
|
||||
case emqx_json:safe_encode(Attrs#{clientid => ClientId,
|
||||
username => Username,
|
||||
ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
|
||||
connack => ConnAck,
|
||||
ts => os:system_time(second)
|
||||
}) of
|
||||
{ok, Payload} ->
|
||||
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
|
||||
{error, Reason} ->
|
||||
|
@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0).
|
|||
reason(Reason) when is_atom(Reason) -> Reason;
|
||||
reason({Error, _}) when is_atom(Error) -> Error;
|
||||
reason(_) -> internal_error.
|
||||
|
||||
|
|
|
@ -68,7 +68,8 @@
|
|||
ignore_loop,
|
||||
topic_alias_maximum,
|
||||
conn_mod,
|
||||
credentials
|
||||
credentials,
|
||||
ws_cookie
|
||||
}).
|
||||
|
||||
-opaque(state() :: #pstate{}).
|
||||
|
@ -85,7 +86,9 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec(init(map(), list()) -> state()).
|
||||
init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
|
||||
init(SocketOpts = #{ peername := Peername
|
||||
, peercert := Peercert
|
||||
, sendfun := SendFun}, Options) ->
|
||||
Zone = proplists:get_value(zone, Options),
|
||||
#pstate{zone = Zone,
|
||||
sendfun = SendFun,
|
||||
|
@ -110,7 +113,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
|
|||
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
|
||||
topic_alias_maximum = #{to_client => 0, from_client => 0},
|
||||
conn_mod = maps:get(conn_mod, SocketOpts, undefined),
|
||||
credentials = #{}}.
|
||||
credentials = #{},
|
||||
ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}.
|
||||
|
||||
init_username(Peercert, Options) ->
|
||||
case proplists:get_value(peer_cert_as_username, Options) of
|
||||
|
@ -135,12 +139,13 @@ info(PState = #pstate{conn_props = ConnProps,
|
|||
topic_aliases = Aliases,
|
||||
will_msg = WillMsg,
|
||||
enable_acl = EnableAcl}) ->
|
||||
attrs(PState) ++ [{conn_props, ConnProps},
|
||||
{ack_props, AckProps},
|
||||
{session, Session},
|
||||
{topic_aliases, Aliases},
|
||||
{will_msg, WillMsg},
|
||||
{enable_acl, EnableAcl}].
|
||||
maps:merge(attrs(PState), #{conn_props => ConnProps,
|
||||
ack_props => AckProps,
|
||||
session => Session,
|
||||
topic_aliases => Aliases,
|
||||
will_msg => WillMsg,
|
||||
enable_acl => EnableAcl
|
||||
}).
|
||||
|
||||
attrs(#pstate{zone = Zone,
|
||||
client_id = ClientId,
|
||||
|
@ -155,20 +160,20 @@ attrs(#pstate{zone = Zone,
|
|||
connected_at = ConnectedAt,
|
||||
conn_mod = ConnMod,
|
||||
credentials = Credentials}) ->
|
||||
[{zone, Zone},
|
||||
{client_id, ClientId},
|
||||
{username, Username},
|
||||
{peername, Peername},
|
||||
{peercert, Peercert},
|
||||
{proto_ver, ProtoVer},
|
||||
{proto_name, ProtoName},
|
||||
{clean_start, CleanStart},
|
||||
{keepalive, Keepalive},
|
||||
{is_bridge, IsBridge},
|
||||
{connected_at, ConnectedAt},
|
||||
{conn_mod, ConnMod},
|
||||
{credentials, Credentials}
|
||||
].
|
||||
#{ zone => Zone
|
||||
, client_id => ClientId
|
||||
, username => Username
|
||||
, peername => Peername
|
||||
, peercert => Peercert
|
||||
, proto_ver => ProtoVer
|
||||
, proto_name => ProtoName
|
||||
, clean_start => CleanStart
|
||||
, keepalive => Keepalive
|
||||
, is_bridge => IsBridge
|
||||
, connected_at => ConnectedAt
|
||||
, conn_mod => ConnMod
|
||||
, credentials => Credentials
|
||||
}.
|
||||
|
||||
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
|
||||
get_property('Receive-Maximum', ConnProps, 65535);
|
||||
|
@ -202,11 +207,13 @@ credentials(#pstate{zone = Zone,
|
|||
client_id = ClientId,
|
||||
username = Username,
|
||||
peername = Peername,
|
||||
peercert = Peercert}) ->
|
||||
peercert = Peercert,
|
||||
ws_cookie = WsCookie}) ->
|
||||
with_cert(#{zone => Zone,
|
||||
client_id => ClientId,
|
||||
username => Username,
|
||||
peername => Peername,
|
||||
ws_cookie => WsCookie,
|
||||
mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert).
|
||||
|
||||
with_cert(Credentials, undefined) -> Credentials;
|
||||
|
|
|
@ -676,6 +676,7 @@ terminate(Reason, #state{will_msg = WillMsg,
|
|||
username = Username,
|
||||
conn_pid = ConnPid,
|
||||
old_conn_pid = OldConnPid}) ->
|
||||
emqx_metrics:commit(),
|
||||
send_willmsg(WillMsg),
|
||||
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
|
||||
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
|
||||
|
|
|
@ -76,11 +76,21 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
|||
%% @doc Start to trace client_id or topic.
|
||||
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
||||
start_trace({client_id, ClientId}, Level, LogFile) ->
|
||||
start_trace({start_trace, {client_id, ClientId}, Level, LogFile});
|
||||
do_start_trace({client_id, ClientId}, Level, LogFile);
|
||||
start_trace({topic, Topic}, Level, LogFile) ->
|
||||
start_trace({start_trace, {topic, Topic}, Level, LogFile}).
|
||||
do_start_trace({topic, Topic}, Level, LogFile).
|
||||
|
||||
start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
|
||||
do_start_trace(Who, Level, LogFile) ->
|
||||
#{level := PrimaryLevel} = logger:get_primary_config(),
|
||||
try logger:compare_levels(log_level(Level), PrimaryLevel) of
|
||||
lt ->
|
||||
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
|
||||
_GtOrEq ->
|
||||
gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000)
|
||||
catch
|
||||
_:Error ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
%% @doc Stop tracing client_id or topic.
|
||||
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
||||
|
@ -109,7 +119,7 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T
|
|||
config => #{type => halt, file => LogFile},
|
||||
filter_default => stop,
|
||||
filters => [{meta_key_filter,
|
||||
{fun filter_by_meta_key/2, Who} }]}) of
|
||||
{fun filter_by_meta_key/2, Who} }]}) of
|
||||
ok ->
|
||||
?LOG(info, "[Tracer] Start trace for ~p", [Who]),
|
||||
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
|
||||
|
@ -168,3 +178,14 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
|||
end;
|
||||
_ -> ignore
|
||||
end.
|
||||
|
||||
log_level(emergency) -> emergency;
|
||||
log_level(alert) -> alert;
|
||||
log_level(critical) -> critical;
|
||||
log_level(error) -> error;
|
||||
log_level(warning) -> warning;
|
||||
log_level(notice) -> notice;
|
||||
log_level(info) -> info;
|
||||
log_level(debug) -> debug;
|
||||
log_level(all) -> debug;
|
||||
log_level(_) -> throw(invalid_log_level).
|
||||
|
|
|
@ -61,11 +61,11 @@ info(#state{peername = Peername,
|
|||
sockname = Sockname,
|
||||
proto_state = ProtoState}) ->
|
||||
ProtoInfo = emqx_protocol:info(ProtoState),
|
||||
ConnInfo = [{socktype, websocket},
|
||||
{conn_state, running},
|
||||
{peername, Peername},
|
||||
{sockname, Sockname}],
|
||||
lists:append([ConnInfo, ProtoInfo]).
|
||||
ConnInfo = #{socktype => websocket,
|
||||
conn_state => running,
|
||||
peername => Peername,
|
||||
sockname => Sockname},
|
||||
maps:merge(ProtoInfo, ConnInfo).
|
||||
|
||||
%% for dashboard
|
||||
attrs(WSPid) when is_pid(WSPid) ->
|
||||
|
@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) ->
|
|||
attrs(#state{peername = Peername,
|
||||
sockname = Sockname,
|
||||
proto_state = ProtoState}) ->
|
||||
SockAttrs = [{peername, Peername},
|
||||
{sockname, Sockname}],
|
||||
SockAttrs = #{peername => Peername,
|
||||
sockname => Sockname},
|
||||
ProtoAttrs = emqx_protocol:attrs(ProtoState),
|
||||
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
|
||||
maps:merge(SockAttrs, ProtoAttrs).
|
||||
|
||||
stats(WSPid) when is_pid(WSPid) ->
|
||||
call(WSPid, stats);
|
||||
|
@ -138,10 +138,22 @@ websocket_init(#state{request = Req, options = Options}) ->
|
|||
Peername = cowboy_req:peer(Req),
|
||||
Sockname = cowboy_req:sock(Req),
|
||||
Peercert = cowboy_req:cert(Req),
|
||||
WsCookie = try cowboy_req:parse_cookies(Req)
|
||||
catch
|
||||
error:badarg ->
|
||||
?LOG(error, "[WS Connection] Illegal cookie"),
|
||||
undefined;
|
||||
Error:Reason ->
|
||||
?LOG(error,
|
||||
"[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p",
|
||||
[Error, Reason]),
|
||||
undefined
|
||||
end,
|
||||
ProtoState = emqx_protocol:init(#{peername => Peername,
|
||||
sockname => Sockname,
|
||||
peercert => Peercert,
|
||||
sendfun => send_fun(self()),
|
||||
ws_cookie => WsCookie,
|
||||
conn_mod => ?MODULE}, Options),
|
||||
ParserState = emqx_protocol:parser(ProtoState),
|
||||
Zone = proplists:get_value(zone, Options),
|
||||
|
|
|
@ -51,16 +51,16 @@ t_connect_api(_Config) ->
|
|||
emqx_client:disconnect(T1).
|
||||
|
||||
t_info(ConnInfo) ->
|
||||
?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)),
|
||||
?assertEqual(running, proplists:get_value(conn_state, ConnInfo)),
|
||||
?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)),
|
||||
?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)),
|
||||
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)).
|
||||
?assertEqual(tcp, maps:get(socktype, ConnInfo)),
|
||||
?assertEqual(running, maps:get(conn_state, ConnInfo)),
|
||||
?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)),
|
||||
?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)),
|
||||
?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)).
|
||||
|
||||
t_attrs(AttrsData) ->
|
||||
?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)),
|
||||
?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)),
|
||||
?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)).
|
||||
?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)),
|
||||
?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)),
|
||||
?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)).
|
||||
|
||||
t_stats(StatsData) ->
|
||||
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),
|
||||
|
|
|
@ -38,8 +38,12 @@ start_traces(_Config) ->
|
|||
emqx_client:connect(T),
|
||||
|
||||
%% Start tracing
|
||||
emqx_logger:set_log_level(error),
|
||||
{error, _} = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
||||
emqx_logger:set_log_level(debug),
|
||||
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
||||
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
|
||||
{error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
|
||||
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
|
||||
ct:sleep(100),
|
||||
|
||||
|
|
|
@ -73,16 +73,16 @@ raw_recv_pase(P) ->
|
|||
version => ?MQTT_PROTO_V4} }).
|
||||
|
||||
t_info(InfoData) ->
|
||||
?assertEqual(websocket, proplists:get_value(socktype, InfoData)),
|
||||
?assertEqual(running, proplists:get_value(conn_state, InfoData)),
|
||||
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)),
|
||||
?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)),
|
||||
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)).
|
||||
?assertEqual(websocket, maps:get(socktype, InfoData)),
|
||||
?assertEqual(running, maps:get(conn_state, InfoData)),
|
||||
?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)),
|
||||
?assertEqual(<<"admin">>, maps:get(username, InfoData)),
|
||||
?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)).
|
||||
|
||||
t_attrs(AttrsData) ->
|
||||
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)),
|
||||
?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)),
|
||||
?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)).
|
||||
?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)),
|
||||
?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)),
|
||||
?assertEqual(<<"admin">>, maps:get(username, AttrsData)).
|
||||
|
||||
t_stats(StatsData) ->
|
||||
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),
|
||||
|
|
|
@ -36,7 +36,7 @@ new(WsUrl, PPid) ->
|
|||
addr = Addr,
|
||||
path = "/" ++ Path,
|
||||
ppid = PPid},
|
||||
spawn(fun () ->
|
||||
spawn(fun() ->
|
||||
start_conn(State)
|
||||
end).
|
||||
|
||||
|
|
Loading…
Reference in New Issue