%%-------------------------------------------------------------------- %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqttd_SUITE). -compile(export_all). -include("emqttd.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(CONTENT_TYPE, "application/x-www-form-urlencoded"). -define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"}, {verify, verify_peer}, {fail_if_no_peer_cert, true}]). -define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"}, {cacertfile, "certs/cacert.pem"}, {certfile, "certs/client-cert.pem"}]). all() -> [{group, protocol}, {group, pubsub}, {group, router}, {group, session}, {group, broker}, {group, metrics}, {group, stats}, {group, hook}, {group, http}, {group, cluster}, {group, alarms}, {group, cli}, {group, cleanSession}]. groups() -> [{protocol, [sequence], [mqtt_connect, mqtt_ssl_oneway, mqtt_ssl_twoway]}, {pubsub, [sequence], [subscribe_unsubscribe, publish, pubsub, t_local_subscribe, t_shared_subscribe, 'pubsub#', 'pubsub+']}, {router, [sequence], [router_add_del, router_print, router_unused]}, {session, [sequence], [start_session]}, {broker, [sequence], [hook_unhook]}, {metrics, [sequence], [inc_dec_metric]}, {stats, [sequence], [set_get_stat]}, {hook, [sequence], [add_delete_hook, run_hooks]}, {http, [sequence], [request_status, request_publish % websocket_test ]}, {cluster, [sequence], [cluster_test, cluster_join, cluster_leave, cluster_remove, cluster_remove2, cluster_node_down ]}, {alarms, [sequence], [set_alarms] }, {cli, [sequence], [ctl_register_cmd, cli_status, cli_broker, cli_clients, cli_sessions, cli_routes, cli_topics, cli_subscriptions, cli_bridges, cli_plugins, cli_listeners, cli_vm]}, {cleanSession, [sequence], [cleanSession_validate, cleanSession_validate1, cleanSession_validate2]}]. init_per_suite(Config) -> application:start(lager), DataDir = proplists:get_value(data_dir, Config), NewConfig = emqttd_config(DataDir), Vals = change_opts(ssl_oneway, DataDir, proplists:get_value(emqttd, NewConfig)), [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals], application:ensure_all_started(emqttd), [{config, NewConfig} | Config]. end_per_suite(_Config) -> application:stop(emqttd), application:stop(esockd), application:stop(gproc), emqttd_mnesia:ensure_stopped(). %%-------------------------------------------------------------------- %% Protocol Test %%-------------------------------------------------------------------- mqtt_connect(_) -> %% Issue #599 %% Empty clientId and clean_session = false ?assertEqual(<<32,2,0,2>>, connect_broker_(<<16,12,0,4,77,81,84,84,4,0,0,90,0,0>>, 4)), %% Empty clientId and clean_session = true ?assertEqual(<<32,2,0,0>>, connect_broker_(<<16,12,0,4,77,81,84,84,4,2,0,90,0,0>>, 4)). connect_broker_(Packet, RecvSize) -> {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]), gen_tcp:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, RecvSize, 3000), gen_tcp:close(Sock), Data. mqtt_ssl_oneway(_) -> {ok, SslOneWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssloneway">>}, ssl]), timer:sleep(10), emqttc:subscribe(SslOneWay, <<"topic">>, qos1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {client_id, <<"pub">>}]), emqttc:publish(Pub, <<"topic">>, <<"SSL oneWay test">>, [{qos, 1}]), timer:sleep(10), receive {publish, _Topic, RM} -> ?assertEqual(<<"SSL oneWay test">>, RM) after 1000 -> false end, emqttc:disconnect(SslOneWay), emqttc:disconnect(Pub). mqtt_ssl_twoway(Config) -> emqttd_cluster:prepare(), DataDir = proplists:get_value(data_dir, Config), EmqConfig = proplists:get_value(config, Config), Vals = change_opts(ssl_twoway, DataDir, proplists:get_value(emqttd, EmqConfig)), [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals], emqttd_cluster:reboot(), ClientSSl = [{Key, filename:join([DataDir, File])} || {Key, File} <- ?MQTT_SSL_CLIENT ], {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssltwoway">>}, {ssl, ClientSSl}]), {ok, Sub} = emqttc:start_link([{host, "localhost"}, {client_id, <<"sub">>}]), emqttc:subscribe(Sub, <<"topic">>, qos1), emqttc:publish(SslTwoWay, <<"topic">>, <<"ssl client pub message">>, [{qos, 1}]), timer:sleep(10), receive {publish, _Topic, RM} -> ?assertEqual(<<"ssl client pub message">>, RM) after 1000 -> false end, emqttc:disconnect(SslTwoWay), emqttc:disconnect(Sub). %%-------------------------------------------------------------------- %% PubSub Test %%-------------------------------------------------------------------- subscribe_unsubscribe(_) -> ok = emqttd:subscribe(<<"topic">>, <<"clientId">>), ok = emqttd:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]), ok = emqttd:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]), ok = emqttd:unsubscribe(<<"topic">>, <<"clientId">>), ok = emqttd:unsubscribe(<<"topic/1">>, <<"clientId">>), ok = emqttd:unsubscribe(<<"topic/2">>, <<"clientId">>). publish(_) -> Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>), ok = emqttd:subscribe(<<"test/+">>), timer:sleep(10), emqttd:publish(Msg), ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). pubsub(_) -> Self = self(), ok = emqttd:subscribe(<<"a/b/c">>, Self, [{qos, 1}]), ?assertMatch({error, _}, emqttd:subscribe(<<"a/b/c">>, Self, [{qos, 2}])), timer:sleep(10), [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self), [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), spawn(fun() -> emqttd:subscribe(<<"a/b/c">>), emqttd:subscribe(<<"c/d/e">>), timer:sleep(10), emqttd:unsubscribe(<<"a/b/c">>) end), timer:sleep(20), emqttd:unsubscribe(<<"a/b/c">>). t_local_subscribe(_) -> emqttd:subscribe("$local/topic0"), emqttd:subscribe("$local/topic1", <<"x">>), emqttd:subscribe("$local/topic2", <<"x">>, [{qos, 2}]), timer:sleep(10), ?assertEqual([self()], emqttd:subscribers("$local/topic0")), ?assertEqual([<<"x">>], emqttd:subscribers("$local/topic1")), ?assertEqual([{<<"$local/topic1">>,<<"x">>,[]},{<<"$local/topic2">>,<<"x">>,[{qos,2}]}], emqttd:subscriptions(<<"x">>)), ?assertEqual(ok, emqttd:unsubscribe("$local/topic0")), ?assertMatch({error, {subscription_not_found, _}}, emqttd:unsubscribe("$local/topic0")), ?assertEqual(ok, emqttd:unsubscribe("$local/topic1", <<"x">>)), ?assertEqual(ok, emqttd:unsubscribe("$local/topic2", <<"x">>)), ?assertEqual([], emqttd:subscribers("topic1")), ?assertEqual([], emqttd:subscriptions(<<"x">>)). t_shared_subscribe(_) -> emqttd:subscribe("$local/$share/group1/topic1"), emqttd:subscribe("$share/group2/topic2"), emqttd:subscribe("$queue/topic3"), timer:sleep(10), ?assertEqual([self()], emqttd:subscribers(<<"$local/$share/group1/topic1">>)), ?assertEqual([{<<"$local/$share/group1/topic1">>, self(), []}, {<<"$queue/topic3">>, self(), []}, {<<"$share/group2/topic2">>, self(), []}], lists:sort(emqttd:subscriptions(self()))), emqttd:unsubscribe("$local/$share/group1/topic1"), emqttd:unsubscribe("$share/group2/topic2"), emqttd:unsubscribe("$queue/topic3"), ?assertEqual([], lists:sort(emqttd:subscriptions(self()))). 'pubsub#'(_) -> emqttd:subscribe(<<"a/#">>), timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end), emqttd:unsubscribe(<<"a/#">>). 'pubsub+'(_) -> emqttd:subscribe(<<"a/+/+">>), timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end), emqttd:unsubscribe(<<"a/+/+">>). loop_recv(Topic, Timeout) -> loop_recv(Topic, Timeout, []). loop_recv(Topic, Timeout, Acc) -> receive {dispatch, Topic, Msg} -> loop_recv(Topic, Timeout, [Msg|Acc]) after Timeout -> {ok, Acc} end. %%-------------------------------------------------------------------- %% Router Test %%-------------------------------------------------------------------- router_add_del(_) -> %% Add emqttd_router:add_route(<<"#">>), emqttd_router:add_route(<<"a/b/c">>), emqttd_router:add_route(<<"+/#">>, node()), Routes = [R1, R2 | _] = [ #mqtt_route{topic = <<"#">>, node = node()}, #mqtt_route{topic = <<"+/#">>, node = node()}, #mqtt_route{topic = <<"a/b/c">>, node = node()}], Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), %% Batch Add emqttd_router:add_routes(Routes), Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), %% Del emqttd_router:del_route(<<"a/b/c">>), [R1, R2] = lists:sort(emqttd_router:match(<<"a/b/c">>)), {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), %% Batch Del R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, emqttd_router:add_route(R3), emqttd_router:del_routes([R1, R2]), emqttd_router:del_route(R3), [] = lists:sort(emqttd_router:match(<<"a/b/c">>)). router_print(_) -> Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()}, #mqtt_route{topic = <<"#">>, node = node()}, #mqtt_route{topic = <<"+/#">>, node = node()}], emqttd_router:add_routes(Routes), emqttd_router:print(<<"a/b/c">>). router_unused(_) -> gen_server:call(emqttd_router, bad_call), gen_server:cast(emqttd_router, bad_msg), emqttd_router ! bad_info. recv_loop(Msgs) -> receive {dispatch, _Topic, Msg} -> recv_loop([Msg|Msgs]) after 100 -> lists:reverse(Msgs) end. %%-------------------------------------------------------------------- %% Session Group %%-------------------------------------------------------------------- start_session(_) -> {ok, ClientPid} = emqttd_mock_client:start_link(<<"clientId">>), {ok, SessPid} = emqttd_mock_client:start_session(ClientPid), Message = emqttd_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), Message1 = Message#mqtt_message{pktid = 1}, emqttd_session:publish(SessPid, Message1), emqttd_session:pubrel(SessPid, 1), emqttd_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), Message2 = emqttd_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>), emqttd_session:publish(SessPid, Message2), emqttd_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]), emqttd_mock_client:stop(ClientPid). %%-------------------------------------------------------------------- %% Broker Group %%-------------------------------------------------------------------- hook_unhook(_) -> ok. %%-------------------------------------------------------------------- %% Metric Group %%-------------------------------------------------------------------- inc_dec_metric(_) -> emqttd_metrics:inc(gauge, 'messages/retained', 10), emqttd_metrics:dec(gauge, 'messages/retained', 10). %%-------------------------------------------------------------------- %% Stats Group %%-------------------------------------------------------------------- set_get_stat(_) -> emqttd_stats:setstat('retained/max', 99), 99 = emqttd_stats:getstat('retained/max'). %%-------------------------------------------------------------------- %% Hook Test %%-------------------------------------------------------------------- add_delete_hook(_) -> ok = emqttd:hook(test_hook, fun ?MODULE:hook_fun1/1, []), ok = emqttd:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), {error, already_hooked} = emqttd:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0}, {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}], Callbacks = emqttd_hooks:lookup(test_hook), ok = emqttd:unhook(test_hook, fun ?MODULE:hook_fun1/1), ct:print("Callbacks: ~p~n", [emqttd_hooks:lookup(test_hook)]), ok = emqttd:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}), {error, not_found} = emqttd:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}), [] = emqttd_hooks:lookup(test_hook), ok = emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun1/1, [], 9), ok = emqttd:hook(emqttd_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8), Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8}, {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}], Callbacks2 = emqttd_hooks:lookup(emqttd_hook), ok = emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun1/1), ok = emqttd:unhook(emqttd_hook, {"tag", fun ?MODULE:hook_fun2/1}), [] = emqttd_hooks:lookup(emqttd_hook). run_hooks(_) -> ok = emqttd:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), ok = emqttd:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]), ok = emqttd:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), ok = emqttd:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), {stop, [r3, r2]} = emqttd:run_hooks(foldl_hook, [arg1, arg2], []), {ok, []} = emqttd:run_hooks(unknown_hook, [], []), ok = emqttd:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), ok = emqttd:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]), ok = emqttd:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), ok = emqttd:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), stop = emqttd:run_hooks(foreach_hook, [arg]). hook_fun1([]) -> ok. hook_fun2([]) -> {ok, []}. hook_fun3(arg1, arg2, _Acc, init) -> ok. hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. hook_fun6(arg, initArg) -> ok. hook_fun7(arg, initArg) -> any. hook_fun8(arg, initArg) -> stop. %%-------------------------------------------------------------------- %% HTTP Request Test %%-------------------------------------------------------------------- request_status(_) -> {InternalStatus, _ProvidedStatus} = init:get_status(), AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of false -> not_running; {value, _Val} -> running end, Status = iolist_to_binary(io_lib:format("Node ~s is ~s~nemqttd is ~s", [node(), InternalStatus, AppStatus])), Url = "http://127.0.0.1:8083/status", {ok, {{"HTTP/1.1", 200, "OK"}, _, Return}} = httpc:request(get, {Url, []}, [], []), ?assertEqual(binary_to_list(Status), Return). request_publish(_) -> ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), Params = "qos=1&retain=0&topic=a/b/c&message=hello", ?assert(connect_emqttd_publish_(post, "mqtt/publish", Params, auth_header_("", ""))), ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), emqttd:unsubscribe(<<"a/b/c">>). connect_emqttd_publish_(Method, Api, Params, Auth) -> Url = "http://127.0.0.1:8083/" ++ Api, case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of {error, socket_closed_remotely} -> false; {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> true; {ok, {{"HTTP/1.1", 400, _}, _, []}} -> false; {ok, {{"HTTP/1.1", 404, _}, _, []}} -> false end. auth_header_(User, Pass) -> Encoded = base64:encode_to_string(lists:append([User,":",Pass])), {"Authorization","Basic " ++ Encoded}. websocket_test(_) -> Conn = esockd_connection:new(esockd_transport, nil, []), Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1}, mochiweb_headers:make([{"Sec-WebSocket-Key","Xn3fdKyc3qEXPuj2A3O+ZA=="}])), ct:log("Req:~p", [Req]), emqttd_http:handle_request(Req). %%-------------------------------------------------------------------- %% cluster group %%-------------------------------------------------------------------- cluster_test(_Config) -> Z = slave(emqttd, cluster_test_z), wait_running(Z), true = emqttd:is_running(Z), Node = node(), ok = rpc:call(Z, emqttd_cluster, join, [Node]), [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)), ct:log("Z:~p, Node:~p", [Z, Node]), ok = rpc:call(Z, emqttd_cluster, leave, []), [Node] = lists:sort(mnesia:system_info(running_db_nodes)), ok = slave:stop(Z). cluster_join(_) -> Z = slave(emqttd, cluster_join_z), N = slave(node, cluster_join_n), wait_running(Z), true = emqttd:is_running(Z), Node = node(), {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node), {error, {node_not_running, N}} = emqttd_cluster:join(N), ok = emqttd_cluster:join(Z), slave:stop(Z), slave:stop(N). cluster_leave(_) -> Z = slave(emqttd, cluster_leave_z), wait_running(Z), {error, node_not_in_cluster} = emqttd_cluster:leave(), ok = emqttd_cluster:join(Z), Node = node(), [Z, Node] = emqttd_mnesia:running_nodes(), ok = emqttd_cluster:leave(), [Node] = emqttd_mnesia:running_nodes(), slave:stop(Z). cluster_remove(_) -> Z = slave(emqttd, cluster_remove_z), wait_running(Z), Node = node(), {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node), ok = emqttd_cluster:join(Z), [Z, Node] = emqttd_mnesia:running_nodes(), ok = emqttd_cluster:remove(Z), [Node] = emqttd_mnesia:running_nodes(), slave:stop(Z). cluster_remove2(_) -> Z = slave(emqttd, cluster_remove2_z), wait_running(Z), ok = emqttd_cluster:join(Z), Node = node(), [Z, Node] = emqttd_mnesia:running_nodes(), ok = emqttd_cluster:remove(Z), ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []), [Node] = emqttd_mnesia:running_nodes(), slave:stop(Z). cluster_node_down(_) -> Z = slave(emqttd, cluster_node_down), timer:sleep(1000), wait_running(Z), ok = emqttd_cluster:join(Z), ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]), ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]), Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), ct:log("Routes: ~p~n", [Routes]), [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes], slave:stop(Z), timer:sleep(1000), [] = lists:sort(emqttd_router:match(<<"a/b/c">>)). set_alarms(_) -> AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, emqttd_alarm:set_alarm(AlarmTest), Alarms = emqttd_alarm:get_alarms(), ?assertEqual(1, length(Alarms)), emqttd_alarm:clear_alarm(<<"1">>), [] = emqttd_alarm:get_alarms(). %%-------------------------------------------------------------------- %% Cli group %%-------------------------------------------------------------------- ctl_register_cmd(_) -> emqttd_ctl:register_cmd(test_cmd, {?MODULE, test_cmd}), erlang:yield(), timer:sleep(5), [{?MODULE, test_cmd}] = emqttd_ctl:lookup(test_cmd), emqttd_ctl:run(["test_cmd", "arg1", "arg2"]), emqttd_ctl:unregister_cmd(test_cmd). test_cmd(["arg1", "arg2"]) -> ct:print("test_cmd is called"); test_cmd([]) -> io:format("test command"). cli_status(_) -> emqttd_cli:status([]). cli_broker(_) -> emqttd_cli:broker([]), emqttd_cli:broker(["stats"]), emqttd_cli:broker(["metrics"]), emqttd_cli:broker(["pubsub"]). cli_clients(_) -> emqttd_cli:clients(["list"]), emqttd_cli:clients(["show", "clientId"]), emqttd_cli:clients(["kick", "clientId"]). cli_sessions(_) -> emqttd_cli:sessions(["list"]), emqttd_cli:sessions(["list", "persistent"]), emqttd_cli:sessions(["list", "transient"]), emqttd_cli:sessions(["show", "clientId"]). cli_routes(_) -> emqttd:subscribe(<<"topic/route">>), emqttd_cli:routes(["list"]), emqttd_cli:routes(["show", "topic/route"]), emqttd:unsubscribe(<<"topic/route">>). cli_topics(_) -> emqttd:subscribe(<<"topic">>), emqttd_cli:topics(["list"]), emqttd_cli:topics(["show", "topic"]), emqttd:unsubscribe(<<"topic">>). cli_subscriptions(_) -> emqttd_cli:subscriptions(["list"]), emqttd_cli:subscriptions(["show", "clientId"]), emqttd_cli:subscriptions(["add", "clientId", "topic", "2"]), emqttd_cli:subscriptions(["del", "clientId", "topic"]). cli_plugins(_) -> emqttd_cli:plugins(["list"]), emqttd_cli:plugins(["load", "emqttd_plugin_template"]), emqttd_cli:plugins(["unload", "emqttd_plugin_template"]). cli_bridges(_) -> emqttd_cli:bridges(["list"]), emqttd_cli:bridges(["start", "a@127.0.0.1", "topic"]), emqttd_cli:bridges(["stop", "a@127.0.0.1", "topic"]). cli_listeners(_) -> emqttd_cli:listeners([]). cli_vm(_) -> emqttd_cli:vm([]), emqttd_cli:vm(["ports"]). cleanSession_validate(_) -> {ok, C1} = emqttc:start_link([{host, "localhost"}, {port, 1883}, {client_id, <<"c1">>}, {clean_sess, false}]), timer:sleep(10), emqttc:subscribe(C1, <<"topic">>, qos0), ok = emqttd_cli:sessions(["list", "persistent"]), emqttc:disconnect(C1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {port, 1883}, {client_id, <<"pub">>}]), emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 0}]), timer:sleep(10), {ok, C11} = emqttc:start_link([{host, "localhost"}, {port, 1883}, {client_id, <<"c1">>}, {clean_sess, false}]), timer:sleep(100), Metrics = emqttd_metrics:all(), ct:log("Metrics:~p~n", [Metrics]), ?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)), ?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)), emqttc:disconnect(Pub), emqttc:disconnect(C11). cleanSession_validate1(_) -> {ok, C1} = emqttc:start_link([{host, "localhost"}, {port, 1883}, {client_id, <<"c1">>}, {clean_sess, true}]), timer:sleep(10), emqttc:subscribe(C1, <<"topic">>, qos1), ok = emqttd_cli:sessions(["list", "transient"]), emqttc:disconnect(C1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {port, 1883}, {client_id, <<"pub">>}]), emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]), timer:sleep(10), {ok, C11} = emqttc:start_link([{host, "localhost"}, {port, 1883}, {client_id, <<"c1">>}, {clean_sess, false}]), timer:sleep(100), Metrics = emqttd_metrics:all(), ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)), ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)), emqttc:disconnect(Pub), emqttc:disconnect(C11). ensure_ok(ok) -> ok; ensure_ok({error, {already_started, _}}) -> ok. host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. wait_running(Node) -> wait_running(Node, 30000). wait_running(Node, Timeout) when Timeout < 0 -> throw({wait_timeout, Node}); wait_running(Node, Timeout) -> case rpc:call(Node, emqttd, is_running, [Node]) of true -> ok; false -> timer:sleep(100), wait_running(Node, Timeout - 100) end. slave(emqttd, Node) -> {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), rpc:call(Emq, application, ensure_all_started, [emqttd]), Emq; slave(node, Node) -> {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), N. emqttd_config(DataDir) -> Schema = cuttlefish_schema:files([filename:join([DataDir, "emqttd.schema"])]), Conf = conf_parse:file(filename:join([DataDir, "emqttd.conf"])), cuttlefish_generator:map(Schema, Conf). change_opts(SslType, DataDir, Vals) -> Listeners = proplists:get_value(listeners, Vals), NewListeners = lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> case Protocol of ssl -> SslOpts = proplists:get_value(ssl, Opts), Keyfile = filename:join([DataDir, proplists:get_value(keyfile, SslOpts)]), Certfile = filename:join([DataDir, proplists:get_value(certfile, SslOpts)]), TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), TupleList3 = case SslType of ssl_twoway-> CAfile = filename:join([DataDir, proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), lists:merge(TupleList2, MutSslList); _ -> TupleList2 end, [{Protocol, Port, [{ssl, TupleList3}]} | Acc]; _ -> [Listener | Acc] end end, [], Listeners), lists:keyreplace(listeners, 1, Vals, {listeners, NewListeners}).