|
|
|
@ -64,12 +64,12 @@ end_per_suite(_Config) ->
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
subscribe_unsubscribe(_) ->
|
|
|
|
|
ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
|
|
|
|
|
ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]),
|
|
|
|
|
ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]),
|
|
|
|
|
ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
|
|
|
|
|
ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
|
|
|
|
|
ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
|
|
|
|
|
ok = emqx:subscribe(<<"topic">>, "clientId"),
|
|
|
|
|
ok = emqx:subscribe(<<"topic/1">>, "clientId", #{ qos => 1 }),
|
|
|
|
|
ok = emqx:subscribe(<<"topic/2">>, "clientId", #{ qos => 2 }),
|
|
|
|
|
ok = emqx:unsubscribe(<<"topic">>, "clientId"),
|
|
|
|
|
ok = emqx:unsubscribe(<<"topic/1">>, "clientId"),
|
|
|
|
|
ok = emqx:unsubscribe(<<"topic/2">>, "clientId").
|
|
|
|
|
|
|
|
|
|
publish(_) ->
|
|
|
|
|
Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
|
|
|
|
@ -80,13 +80,17 @@ publish(_) ->
|
|
|
|
|
|
|
|
|
|
pubsub(_) ->
|
|
|
|
|
Self = self(),
|
|
|
|
|
ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]),
|
|
|
|
|
?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])),
|
|
|
|
|
Subscriber = {Self, <<"clientId">>},
|
|
|
|
|
ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }),
|
|
|
|
|
#{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
|
|
|
|
|
ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }),
|
|
|
|
|
#{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
|
|
|
|
|
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
|
|
|
|
|
timer:sleep(10),
|
|
|
|
|
[{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self),
|
|
|
|
|
[{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>),
|
|
|
|
|
[{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),
|
|
|
|
|
[{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>),
|
|
|
|
|
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
|
|
|
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
|
|
|
|
|
?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end),
|
|
|
|
|
spawn(fun() ->
|
|
|
|
|
emqx:subscribe(<<"a/b/c">>),
|
|
|
|
|
emqx:subscribe(<<"c/d/e">>),
|
|
|
|
@ -97,32 +101,33 @@ pubsub(_) ->
|
|
|
|
|
emqx:unsubscribe(<<"a/b/c">>).
|
|
|
|
|
|
|
|
|
|
t_local_subscribe(_) ->
|
|
|
|
|
ok = emqx:subscribe("$local/topic0"),
|
|
|
|
|
ok = emqx:subscribe("$local/topic1", <<"x">>),
|
|
|
|
|
ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]),
|
|
|
|
|
ok = emqx:subscribe(<<"$local/topic0">>),
|
|
|
|
|
ok = emqx:subscribe(<<"$local/topic1">>, "clientId"),
|
|
|
|
|
ok = emqx:subscribe(<<"$local/topic2">>, "clientId", #{ qos => 2 }),
|
|
|
|
|
timer:sleep(10),
|
|
|
|
|
?assertEqual([self()], emqx:subscribers("$local/topic0")),
|
|
|
|
|
?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")),
|
|
|
|
|
?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []},
|
|
|
|
|
{{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}],
|
|
|
|
|
emqx:subscriptions(<<"x">>)),
|
|
|
|
|
?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")),
|
|
|
|
|
?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")),
|
|
|
|
|
?assertEqual([{<<"$local/topic1">>, #{}},
|
|
|
|
|
{<<"$local/topic2">>, #{ qos => 2 }}],
|
|
|
|
|
emqx:subscriptions({self(), <<"clientId">>})),
|
|
|
|
|
?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
|
|
|
|
|
?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")),
|
|
|
|
|
?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)),
|
|
|
|
|
?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)),
|
|
|
|
|
?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
|
|
|
|
|
?assertEqual(ok, emqx:unsubscribe("$local/topic1", "clientId")),
|
|
|
|
|
?assertEqual(ok, emqx:unsubscribe("$local/topic2", "clientId")),
|
|
|
|
|
?assertEqual([], emqx:subscribers("topic1")),
|
|
|
|
|
?assertEqual([], emqx:subscriptions(<<"x">>)).
|
|
|
|
|
?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})).
|
|
|
|
|
|
|
|
|
|
t_shared_subscribe(_) ->
|
|
|
|
|
emqx:subscribe("$local/$share/group1/topic1"),
|
|
|
|
|
emqx:subscribe("$share/group2/topic2"),
|
|
|
|
|
emqx:subscribe("$queue/topic3"),
|
|
|
|
|
timer:sleep(10),
|
|
|
|
|
?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
|
|
|
|
|
?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []},
|
|
|
|
|
{self(), <<"$queue/topic3">>, []},
|
|
|
|
|
{self(), <<"$share/group2/topic2">>, []}],
|
|
|
|
|
lists:sort(emqx:subscriptions(self()))),
|
|
|
|
|
ct:log("share subscriptions: ~p~n", [emqx:subscriptions({self(), undefined})]),
|
|
|
|
|
?assertEqual([{self(), undefined}], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
|
|
|
|
|
?assertEqual([{<<"$local/$share/group1/topic1">>, #{}},
|
|
|
|
|
{<<"$queue/topic3">>, #{}},
|
|
|
|
|
{<<"$share/group2/topic2">>, #{}}],
|
|
|
|
|
lists:sort(emqx:subscriptions({self(), undefined}))),
|
|
|
|
|
emqx:unsubscribe("$local/$share/group1/topic1"),
|
|
|
|
|
emqx:unsubscribe("$share/group2/topic2"),
|
|
|
|
|
emqx:unsubscribe("$queue/topic3"),
|
|
|
|
@ -146,17 +151,18 @@ t_shared_subscribe(_) ->
|
|
|
|
|
%% Session Group
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
start_session(_) ->
|
|
|
|
|
{ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>),
|
|
|
|
|
{ok, SessPid} = emqx_mock_client:start_session(ClientPid),
|
|
|
|
|
Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
|
|
|
|
|
Message1 = Message#message{id = 1},
|
|
|
|
|
emqx_session:publish(SessPid, Message1),
|
|
|
|
|
emqx_session:pubrel(SessPid, 1),
|
|
|
|
|
emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]),
|
|
|
|
|
ClientId = <<"clientId">>,
|
|
|
|
|
{ok, ClientPid} = emqx_mock_client:start_link(ClientId),
|
|
|
|
|
{ok, SessPid} = emqx_mock_client:open_session(ClientPid, ClientId, internal),
|
|
|
|
|
Message1 = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
|
|
|
|
|
emqx_session:publish(SessPid, 1, Message1),
|
|
|
|
|
emqx_session:pubrel(SessPid, 2, reasoncode),
|
|
|
|
|
emqx_session:subscribe(SessPid, [{<<"topic/session">>, #{qos => 2}}]),
|
|
|
|
|
Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
|
|
|
|
|
emqx_session:publish(SessPid, Message2),
|
|
|
|
|
emqx_session:publish(SessPid, 3, Message2),
|
|
|
|
|
emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]),
|
|
|
|
|
emqx_mock_client:stop(ClientPid).
|
|
|
|
|
%% emqx_mock_client:stop(ClientPid).
|
|
|
|
|
emqx_mock_client:close_session(ClientPid, SessPid).
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
%% Broker Group
|
|
|
|
@ -231,10 +237,10 @@ hook_fun8(arg, initArg) -> stop.
|
|
|
|
|
|
|
|
|
|
set_alarms(_) ->
|
|
|
|
|
AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
|
|
|
|
|
emqx_alarm:set_alarm(AlarmTest),
|
|
|
|
|
Alarms = emqx_alarm:get_alarms(),
|
|
|
|
|
emqx_alarm_mgr:set_alarm(AlarmTest),
|
|
|
|
|
Alarms = emqx_alarm_mgr:get_alarms(),
|
|
|
|
|
ct:log("Alarms Length: ~p ~n", [length(Alarms)]),
|
|
|
|
|
?assertEqual(1, length(Alarms)),
|
|
|
|
|
emqx_alarm:clear_alarm(<<"1">>),
|
|
|
|
|
[] = emqx_alarm:get_alarms().
|
|
|
|
|
|
|
|
|
|
emqx_alarm_mgr:clear_alarm(<<"1">>),
|
|
|
|
|
[] = emqx_alarm_mgr:get_alarms().
|
|
|
|
|
|
|
|
|
|