test(cluster-link): adapt extrouter testsuite to new APIs
This commit is contained in:
parent
45eda4f3b9
commit
036c7e8492
|
@ -8,9 +8,13 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("emqx/include/asserts.hrl").
|
-include_lib("emqx/include/asserts.hrl").
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-define(CLUSTER, <<"link1">>).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -45,32 +49,32 @@ t_consistent_routing_view(_Config) ->
|
||||||
Actor1 = {?FUNCTION_NAME, 1},
|
Actor1 = {?FUNCTION_NAME, 1},
|
||||||
Actor2 = {?FUNCTION_NAME, 2},
|
Actor2 = {?FUNCTION_NAME, 2},
|
||||||
Actor3 = {?FUNCTION_NAME, 3},
|
Actor3 = {?FUNCTION_NAME, 3},
|
||||||
{ok, AS10} = emqx_cluster_link_extrouter:actor_init(Actor1, 1, env()),
|
{ok, AS10} = actor_init(Actor1, 1),
|
||||||
{ok, AS20} = emqx_cluster_link_extrouter:actor_init(Actor2, 1, env()),
|
{ok, AS20} = actor_init(Actor2, 1),
|
||||||
{ok, AS30} = emqx_cluster_link_extrouter:actor_init(Actor3, 1, env()),
|
{ok, AS30} = actor_init(Actor3, 1),
|
||||||
%% Add few routes originating from different actors.
|
%% Add few routes originating from different actors.
|
||||||
%% Also test that route operations are idempotent.
|
%% Also test that route operations are idempotent.
|
||||||
AS11 = apply_operation({add, <<"t/client/#">>, id}, AS10),
|
AS11 = apply_operation({add, {<<"t/client/#">>, id}}, AS10),
|
||||||
_AS11 = apply_operation({add, <<"t/client/#">>, id}, AS10),
|
_AS11 = apply_operation({add, {<<"t/client/#">>, id}}, AS10),
|
||||||
AS21 = apply_operation({add, <<"t/client/#">>, id}, AS20),
|
AS21 = apply_operation({add, {<<"t/client/#">>, id}}, AS20),
|
||||||
AS31 = apply_operation({add, <<"t/client/+/+">>, id1}, AS30),
|
AS31 = apply_operation({add, {<<"t/client/+/+">>, id1}}, AS30),
|
||||||
AS32 = apply_operation({add, <<"t/client/+/+">>, id2}, AS31),
|
AS32 = apply_operation({add, {<<"t/client/+/+">>, id2}}, AS31),
|
||||||
_AS22 = apply_operation({del, <<"t/client/#">>, id}, AS21),
|
_AS22 = apply_operation({delete, {<<"t/client/#">>, id}}, AS21),
|
||||||
AS12 = apply_operation({add, <<"t/client/+/+">>, id1}, AS11),
|
AS12 = apply_operation({add, {<<"t/client/+/+">>, id1}}, AS11),
|
||||||
AS33 = apply_operation({del, <<"t/client/+/+">>, id1}, AS32),
|
AS33 = apply_operation({delete, {<<"t/client/+/+">>, id1}}, AS32),
|
||||||
_AS34 = apply_operation({del, <<"t/client/+/+">>, id2}, AS33),
|
_AS34 = apply_operation({delete, {<<"t/client/+/+">>, id2}}, AS33),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[<<"t/client/#">>, <<"t/client/+/+">>],
|
[<<"t/client/#">>, <<"t/client/+/+">>],
|
||||||
topics_sorted()
|
topics_sorted()
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[<<"t/client/#">>],
|
[#route{topic = <<"t/client/#">>, dest = ?CLUSTER}],
|
||||||
lists:sort(emqx_cluster_link_extrouter:match_routes(<<"t/client/42">>))
|
emqx_cluster_link_extrouter:match_routes(<<"t/client/42">>)
|
||||||
),
|
),
|
||||||
%% Remove all routes from the actors.
|
%% Remove all routes from the actors.
|
||||||
AS13 = apply_operation({del, <<"t/client/#">>, id}, AS12),
|
AS13 = apply_operation({delete, {<<"t/client/#">>, id}}, AS12),
|
||||||
AS14 = apply_operation({del, <<"t/client/+/+">>, id1}, AS13),
|
AS14 = apply_operation({delete, {<<"t/client/+/+">>, id1}}, AS13),
|
||||||
AS14 = apply_operation({del, <<"t/client/+/+">>, id1}, AS13),
|
AS14 = apply_operation({delete, {<<"t/client/+/+">>, id1}}, AS13),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[],
|
[],
|
||||||
topics_sorted()
|
topics_sorted()
|
||||||
|
@ -79,28 +83,28 @@ t_consistent_routing_view(_Config) ->
|
||||||
t_actor_reincarnation(_Config) ->
|
t_actor_reincarnation(_Config) ->
|
||||||
Actor1 = {?FUNCTION_NAME, 1},
|
Actor1 = {?FUNCTION_NAME, 1},
|
||||||
Actor2 = {?FUNCTION_NAME, 2},
|
Actor2 = {?FUNCTION_NAME, 2},
|
||||||
{ok, AS10} = emqx_cluster_link_extrouter:actor_init(Actor1, 1, env()),
|
{ok, AS10} = actor_init(Actor1, 1),
|
||||||
{ok, AS20} = emqx_cluster_link_extrouter:actor_init(Actor2, 1, env()),
|
{ok, AS20} = actor_init(Actor2, 1),
|
||||||
AS11 = apply_operation({add, <<"topic/#">>, id}, AS10),
|
AS11 = apply_operation({add, {<<"topic/#">>, id}}, AS10),
|
||||||
AS12 = apply_operation({add, <<"topic/42/+">>, id}, AS11),
|
AS12 = apply_operation({add, {<<"topic/42/+">>, id}}, AS11),
|
||||||
AS21 = apply_operation({add, <<"topic/#">>, id}, AS20),
|
AS21 = apply_operation({add, {<<"topic/#">>, id}}, AS20),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[<<"topic/#">>, <<"topic/42/+">>],
|
[<<"topic/#">>, <<"topic/42/+">>],
|
||||||
topics_sorted()
|
topics_sorted()
|
||||||
),
|
),
|
||||||
{ok, _AS3} = emqx_cluster_link_extrouter:actor_init(Actor1, 2, env()),
|
{ok, _AS3} = actor_init(Actor1, 2),
|
||||||
?assertError(
|
?assertError(
|
||||||
_IncarnationMismatch,
|
_IncarnationMismatch,
|
||||||
apply_operation({add, <<"toolate/#">>, id}, AS12)
|
apply_operation({add, {<<"toolate/#">>, id}}, AS12)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[<<"topic/#">>],
|
[<<"topic/#">>],
|
||||||
topics_sorted()
|
topics_sorted()
|
||||||
),
|
),
|
||||||
{ok, _AS4} = emqx_cluster_link_extrouter:actor_init(Actor2, 2, env()),
|
{ok, _AS4} = actor_init(Actor2, 2),
|
||||||
?assertError(
|
?assertError(
|
||||||
_IncarnationMismatch,
|
_IncarnationMismatch,
|
||||||
apply_operation({add, <<"toolate/#">>, id}, AS21)
|
apply_operation({add, {<<"toolate/#">>, id}}, AS21)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[],
|
[],
|
||||||
|
@ -110,11 +114,11 @@ t_actor_reincarnation(_Config) ->
|
||||||
t_actor_gc(_Config) ->
|
t_actor_gc(_Config) ->
|
||||||
Actor1 = {?FUNCTION_NAME, 1},
|
Actor1 = {?FUNCTION_NAME, 1},
|
||||||
Actor2 = {?FUNCTION_NAME, 2},
|
Actor2 = {?FUNCTION_NAME, 2},
|
||||||
{ok, AS10} = emqx_cluster_link_extrouter:actor_init(Actor1, 1, env()),
|
{ok, AS10} = actor_init(Actor1, 1),
|
||||||
{ok, AS20} = emqx_cluster_link_extrouter:actor_init(Actor2, 1, env()),
|
{ok, AS20} = actor_init(Actor2, 1),
|
||||||
AS11 = apply_operation({add, <<"topic/#">>, id}, AS10),
|
AS11 = apply_operation({add, {<<"topic/#">>, id}}, AS10),
|
||||||
AS12 = apply_operation({add, <<"topic/42/+">>, id}, AS11),
|
AS12 = apply_operation({add, {<<"topic/42/+">>, id}}, AS11),
|
||||||
AS21 = apply_operation({add, <<"global/#">>, id}, AS20),
|
AS21 = apply_operation({add, {<<"global/#">>, id}}, AS20),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[<<"global/#">>, <<"topic/#">>, <<"topic/42/+">>],
|
[<<"global/#">>, <<"topic/#">>, <<"topic/42/+">>],
|
||||||
topics_sorted()
|
topics_sorted()
|
||||||
|
@ -127,7 +131,7 @@ t_actor_gc(_Config) ->
|
||||||
),
|
),
|
||||||
?assertError(
|
?assertError(
|
||||||
_IncarnationMismatch,
|
_IncarnationMismatch,
|
||||||
apply_operation({add, <<"toolate/#">>, id}, AS21)
|
apply_operation({add, {<<"toolate/#">>, id}}, AS21)
|
||||||
),
|
),
|
||||||
ok = emqx_cluster_link_extrouter:actor_gc(env(120_000)),
|
ok = emqx_cluster_link_extrouter:actor_gc(env(120_000)),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -138,25 +142,25 @@ t_actor_gc(_Config) ->
|
||||||
t_consistent_routing_view_concurrent_updates(_Config) ->
|
t_consistent_routing_view_concurrent_updates(_Config) ->
|
||||||
A1Seq = repeat(10, [
|
A1Seq = repeat(10, [
|
||||||
reincarnate,
|
reincarnate,
|
||||||
{add, <<"t/client/#">>, id},
|
{add, {<<"t/client/#">>, id}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{del, <<"t/client/#">>, id}
|
{delete, {<<"t/client/#">>, id}}
|
||||||
]),
|
]),
|
||||||
A2Seq = repeat(10, [
|
A2Seq = repeat(10, [
|
||||||
{add, <<"global/#">>, id},
|
{add, {<<"global/#">>, id}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{add, <<"t/client/+/+">>, id2},
|
{add, {<<"t/client/+/+">>, id2}},
|
||||||
{del, <<"t/client/+/+">>, id1},
|
{delete, {<<"t/client/+/+">>, id1}},
|
||||||
heartbeat
|
heartbeat
|
||||||
]),
|
]),
|
||||||
A3Seq = repeat(10, [
|
A3Seq = repeat(10, [
|
||||||
{add, <<"global/#">>, id},
|
{add, {<<"global/#">>, id}},
|
||||||
{del, <<"global/#">>, id},
|
{delete, {<<"global/#">>, id}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{del, <<"t/client/+/+">>, id1},
|
{delete, {<<"t/client/+/+">>, id1}},
|
||||||
{add, <<"t/client/+/+">>, id2},
|
{add, {<<"t/client/+/+">>, id2}},
|
||||||
{del, <<"t/client/+/+">>, id2},
|
{delete, {<<"t/client/+/+">>, id2}},
|
||||||
reincarnate
|
reincarnate
|
||||||
]),
|
]),
|
||||||
A4Seq = repeat(10, [
|
A4Seq = repeat(10, [
|
||||||
|
@ -197,25 +201,25 @@ t_consistent_routing_view_concurrent_cluster_updates(Config) ->
|
||||||
[N1, N2, N3] = ?config(cluster, Config),
|
[N1, N2, N3] = ?config(cluster, Config),
|
||||||
A1Seq = repeat(10, [
|
A1Seq = repeat(10, [
|
||||||
reincarnate,
|
reincarnate,
|
||||||
{add, <<"t/client/#">>, id},
|
{add, {<<"t/client/#">>, id}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{del, <<"t/client/#">>, id}
|
{delete, {<<"t/client/#">>, id}}
|
||||||
]),
|
]),
|
||||||
A2Seq = repeat(10, [
|
A2Seq = repeat(10, [
|
||||||
{add, <<"global/#">>, id},
|
{add, {<<"global/#">>, id}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{add, <<"t/client/+/+">>, id2},
|
{add, {<<"t/client/+/+">>, id2}},
|
||||||
{del, <<"t/client/+/+">>, id1},
|
{delete, {<<"t/client/+/+">>, id1}},
|
||||||
heartbeat
|
heartbeat
|
||||||
]),
|
]),
|
||||||
A3Seq = repeat(10, [
|
A3Seq = repeat(10, [
|
||||||
{add, <<"global/#">>, id},
|
{add, {<<"global/#">>, id}},
|
||||||
{del, <<"global/#">>, id},
|
{delete, {<<"global/#">>, id}},
|
||||||
{add, <<"t/client/+/+">>, id1},
|
{add, {<<"t/client/+/+">>, id1}},
|
||||||
{del, <<"t/client/+/+">>, id1},
|
{delete, {<<"t/client/+/+">>, id1}},
|
||||||
{add, <<"t/client/+/+">>, id2},
|
{add, {<<"t/client/+/+">>, id2}},
|
||||||
{del, <<"t/client/+/+">>, id2},
|
{delete, {<<"t/client/+/+">>, id2}},
|
||||||
reincarnate
|
reincarnate
|
||||||
]),
|
]),
|
||||||
A4Seq = repeat(10, [
|
A4Seq = repeat(10, [
|
||||||
|
@ -259,12 +263,12 @@ run_remote_actor({Node, Run}) ->
|
||||||
erlang:spawn_monitor(Node, ?MODULE, run_actor, [Run]).
|
erlang:spawn_monitor(Node, ?MODULE, run_actor, [Run]).
|
||||||
|
|
||||||
run_actor({Actor, Seq}) ->
|
run_actor({Actor, Seq}) ->
|
||||||
{ok, AS0} = emqx_cluster_link_extrouter:actor_init(Actor, 0, env(0)),
|
{ok, AS0} = actor_init(Actor, 0),
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun
|
fun
|
||||||
({TS, {add, _, _} = Op}, AS) ->
|
({TS, {add, _} = Op}, AS) ->
|
||||||
apply_operation(Op, AS, TS);
|
apply_operation(Op, AS, TS);
|
||||||
({TS, {del, _, _} = Op}, AS) ->
|
({TS, {delete, _} = Op}, AS) ->
|
||||||
apply_operation(Op, AS, TS);
|
apply_operation(Op, AS, TS);
|
||||||
({TS, heartbeat}, AS) ->
|
({TS, heartbeat}, AS) ->
|
||||||
apply_operation(heartbeat, AS, TS);
|
apply_operation(heartbeat, AS, TS);
|
||||||
|
@ -275,7 +279,7 @@ run_actor({Actor, Seq}) ->
|
||||||
ok = timer:sleep(MS),
|
ok = timer:sleep(MS),
|
||||||
AS;
|
AS;
|
||||||
({TS, reincarnate}, _AS) ->
|
({TS, reincarnate}, _AS) ->
|
||||||
{ok, AS} = emqx_cluster_link_extrouter:actor_init(Actor, TS, env(TS)),
|
{ok, AS} = actor_init(Actor, TS, TS),
|
||||||
AS
|
AS
|
||||||
end,
|
end,
|
||||||
AS0,
|
AS0,
|
||||||
|
@ -284,6 +288,12 @@ run_actor({Actor, Seq}) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
actor_init(Actor, Incarnation) ->
|
||||||
|
actor_init(Actor, Incarnation, _TS = 0).
|
||||||
|
|
||||||
|
actor_init(Actor, Incarnation, TS) ->
|
||||||
|
emqx_cluster_link_extrouter:actor_init(?CLUSTER, Actor, Incarnation, env(TS)).
|
||||||
|
|
||||||
apply_operation(Op, AS) ->
|
apply_operation(Op, AS) ->
|
||||||
apply_operation(Op, AS, _TS = 42).
|
apply_operation(Op, AS, _TS = 42).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue