commit
8bc0ba9a7b
|
@ -179,7 +179,7 @@ end}.
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "log.syslog.identity", "lager.handlers", [
|
{mapping, "log.syslog.identity", "lager.handlers", [
|
||||||
{default, "emq"},
|
{default, "emqttd"},
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
|
|
@ -228,7 +228,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
|
||||||
%% The session process exited unexpectedly.
|
%% The session process exited unexpectedly.
|
||||||
handle_info({'EXIT', Pid, Reason}, State = #wsclient_state{proto_state = ProtoState}) ->
|
handle_info({'EXIT', Pid, Reason}, State = #wsclient_state{proto_state = ProtoState}) ->
|
||||||
case emqttd_protocol:session(ProtoState) of
|
case emqttd_protocol:session(ProtoState) of
|
||||||
Pid -> shutdown(Reason, State);
|
Pid -> stop(Reason, State);
|
||||||
_ -> ?WSLOG(error, "Unexpected EXIT: ~p, Reason: ~p", [Pid, Reason], State),
|
_ -> ?WSLOG(error, "Unexpected EXIT: ~p, Reason: ~p", [Pid, Reason], State),
|
||||||
{noreply, State, hibernate}
|
{noreply, State, hibernate}
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -103,12 +103,15 @@ groups() ->
|
||||||
cli_subscriptions,
|
cli_subscriptions,
|
||||||
cli_bridges,
|
cli_bridges,
|
||||||
cli_plugins,
|
cli_plugins,
|
||||||
cli_listeners,
|
{listeners, [sequence],
|
||||||
|
[cli_listeners,
|
||||||
|
conflict_listeners
|
||||||
|
]},
|
||||||
cli_vm]},
|
cli_vm]},
|
||||||
{cleanSession, [sequence],
|
{cleanSession, [sequence],
|
||||||
[cleanSession_validate,
|
[cleanSession_validate,
|
||||||
cleanSession_validate1,
|
cleanSession_validate1
|
||||||
cleanSession_validate2]}].
|
]}].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
application:start(lager),
|
application:start(lager),
|
||||||
|
@ -619,6 +622,35 @@ cli_bridges(_) ->
|
||||||
cli_listeners(_) ->
|
cli_listeners(_) ->
|
||||||
emqttd_cli:listeners([]).
|
emqttd_cli:listeners([]).
|
||||||
|
|
||||||
|
conflict_listeners(_) ->
|
||||||
|
F =
|
||||||
|
fun() ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
emqttc:start_link([{host, "localhost"},
|
||||||
|
{port, 1883},
|
||||||
|
{client_id, <<"c1">>},
|
||||||
|
{clean_sess, false}])
|
||||||
|
end,
|
||||||
|
spawn_link(F),
|
||||||
|
|
||||||
|
{ok, C2} = emqttc:start_link([{host, "localhost"},
|
||||||
|
{port, 1883},
|
||||||
|
{client_id, <<"c1">>},
|
||||||
|
{clean_sess, false}]),
|
||||||
|
timer:sleep(100),
|
||||||
|
|
||||||
|
Listeners =
|
||||||
|
lists:map(fun({{Protocol, ListenOn}, Pid}) ->
|
||||||
|
Key = atom_to_list(Protocol) ++ ":" ++ esockd:to_string(ListenOn),
|
||||||
|
{Key, [{acceptors, esockd:get_acceptors(Pid)},
|
||||||
|
{max_clients, esockd:get_max_clients(Pid)},
|
||||||
|
{current_clients, esockd:get_current_clients(Pid)},
|
||||||
|
{shutdown_count, esockd:get_shutdown_count(Pid)}]}
|
||||||
|
end, esockd:listeners()),
|
||||||
|
?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:1883", Listeners))),
|
||||||
|
?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:1883", Listeners))),
|
||||||
|
emqttc:disconnect(C2).
|
||||||
|
|
||||||
cli_vm(_) ->
|
cli_vm(_) ->
|
||||||
emqttd_cli:vm([]),
|
emqttd_cli:vm([]),
|
||||||
emqttd_cli:vm(["ports"]).
|
emqttd_cli:vm(["ports"]).
|
||||||
|
@ -630,7 +662,6 @@ cleanSession_validate(_) ->
|
||||||
{clean_sess, false}]),
|
{clean_sess, false}]),
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
emqttc:subscribe(C1, <<"topic">>, qos0),
|
emqttc:subscribe(C1, <<"topic">>, qos0),
|
||||||
ok = emqttd_cli:sessions(["list", "persistent"]),
|
|
||||||
emqttc:disconnect(C1),
|
emqttc:disconnect(C1),
|
||||||
{ok, Pub} = emqttc:start_link([{host, "localhost"},
|
{ok, Pub} = emqttc:start_link([{host, "localhost"},
|
||||||
{port, 1883},
|
{port, 1883},
|
||||||
|
@ -644,7 +675,6 @@ cleanSession_validate(_) ->
|
||||||
{clean_sess, false}]),
|
{clean_sess, false}]),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
Metrics = emqttd_metrics:all(),
|
Metrics = emqttd_metrics:all(),
|
||||||
ct:log("Metrics:~p~n", [Metrics]),
|
|
||||||
?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)),
|
?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)),
|
||||||
?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)),
|
?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)),
|
||||||
emqttc:disconnect(Pub),
|
emqttc:disconnect(Pub),
|
||||||
|
@ -657,7 +687,6 @@ cleanSession_validate1(_) ->
|
||||||
{clean_sess, true}]),
|
{clean_sess, true}]),
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
emqttc:subscribe(C1, <<"topic">>, qos1),
|
emqttc:subscribe(C1, <<"topic">>, qos1),
|
||||||
ok = emqttd_cli:sessions(["list", "transient"]),
|
|
||||||
emqttc:disconnect(C1),
|
emqttc:disconnect(C1),
|
||||||
{ok, Pub} = emqttc:start_link([{host, "localhost"},
|
{ok, Pub} = emqttc:start_link([{host, "localhost"},
|
||||||
{port, 1883},
|
{port, 1883},
|
||||||
|
|
Loading…
Reference in New Issue