Fix reload zone fail bug (#2857)

This commit is contained in:
turtleDeng 2019-08-31 15:03:40 +08:00 committed by Shawn
parent 19a8f0cbf8
commit 0908974017
5 changed files with 43 additions and 64 deletions

View File

@ -126,31 +126,13 @@ default_caps() ->
?DEFAULT_CAPS. ?DEFAULT_CAPS.
get_caps(Zone, publish) -> get_caps(Zone, publish) ->
with_env(Zone, '$mqtt_pub_caps', filter_caps(?PUBCAP_KEYS, get_caps(Zone));
fun() ->
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
end);
get_caps(Zone, subscribe) -> get_caps(Zone, subscribe) ->
with_env(Zone, '$mqtt_sub_caps', filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
fun() ->
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
end).
get_caps(Zone) -> get_caps(Zone) ->
with_env(Zone, '$mqtt_caps', maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || {Cap, Def} <- ?DEFAULT_CAPS]).
fun() ->
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)}
|| {Cap, Def} <- ?DEFAULT_CAPS])
end).
filter_caps(Keys, Caps) -> filter_caps(Keys, Caps) ->
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
with_env(Zone, Key, InitFun) ->
case emqx_zone:get_env(Zone, Key) of
undefined -> Caps = InitFun(),
ok = emqx_zone:set_env(Zone, Key, Caps),
Caps;
ZoneCaps -> ZoneCaps
end.

View File

@ -70,6 +70,7 @@ receive_messages(Count, Msgs) ->
basic_test(_Config) -> basic_test(_Config) ->
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"), ct:print("Basic test starting"),
init_caps(),
{ok, C} = emqx_client:start_link(), {ok, C} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(C), {ok, _} = emqx_client:connect(C),
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1), {ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
@ -81,6 +82,7 @@ basic_test(_Config) ->
ok = emqx_client:disconnect(C). ok = emqx_client:disconnect(C).
will_message_test(_Config) -> will_message_test(_Config) ->
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, true}, {ok, C1} = emqx_client:start_link([{clean_start, true},
{will_topic, nth(3, ?TOPICS)}, {will_topic, nth(3, ?TOPICS)},
{will_payload, <<"client disconnected">>}, {will_payload, <<"client disconnected">>},
@ -99,10 +101,10 @@ will_message_test(_Config) ->
ct:print("Will message test succeeded"). ct:print("Will message test succeeded").
offline_message_queueing_test(_) -> offline_message_queueing_test(_) ->
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, false}, {ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]), {client_id, <<"c1">>}]),
{ok, _} = emqx_client:connect(C1), {ok, _} = emqx_client:connect(C1),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1), ok = emqx_client:disconnect(C1),
{ok, C2} = emqx_client:start_link([{clean_start, true}, {ok, C2} = emqx_client:start_link([{clean_start, true},
@ -123,6 +125,7 @@ offline_message_queueing_test(_) ->
?assertEqual(3, length(receive_messages(3))). ?assertEqual(3, length(receive_messages(3))).
overlapping_subscriptions_test(_) -> overlapping_subscriptions_test(_) ->
init_caps(),
{ok, C} = emqx_client:start_link([]), {ok, C} = emqx_client:start_link([]),
{ok, _} = emqx_client:connect(C), {ok, _} = emqx_client:connect(C),
@ -163,6 +166,7 @@ overlapping_subscriptions_test(_) ->
redelivery_on_reconnect_test(_) -> redelivery_on_reconnect_test(_) ->
ct:print("Redelivery on reconnect test starting"), ct:print("Redelivery on reconnect test starting"),
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, false}, {ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]), {client_id, <<"c">>}]),
{ok, _} = emqx_client:connect(C1), {ok, _} = emqx_client:connect(C1),
@ -194,6 +198,7 @@ redelivery_on_reconnect_test(_) ->
dollar_topics_test(_) -> dollar_topics_test(_) ->
ct:print("$ topics test starting"), ct:print("$ topics test starting"),
init_caps(),
{ok, C} = emqx_client:start_link([{clean_start, true}, {ok, C} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]), {keepalive, 0}]),
{ok, _} = emqx_client:connect(C), {ok, _} = emqx_client:connect(C),
@ -205,3 +210,13 @@ dollar_topics_test(_) ->
?assertEqual(0, length(receive_messages(1))), ?assertEqual(0, length(receive_messages(1))),
ok = emqx_client:disconnect(C), ok = emqx_client:disconnect(C),
ct:print("$ topics test succeeded"). ct:print("$ topics test succeeded").
init_caps() ->
Caps = #{max_qos_allowed => 2,
max_topic_levels => 0,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true,
max_topic_alias => 0,
mqtt_retain_available => true},
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
timer:sleep(100).

View File

@ -27,38 +27,19 @@ all() -> [t_get_set_caps, t_check_pub, t_check_sub].
t_get_set_caps(_) -> t_get_set_caps(_) ->
{ok, _} = emqx_zone:start_link(), {ok, _} = emqx_zone:start_link(),
Caps = #{
max_packet_size => ?MAX_PACKET_SIZE, PubCaps = emqx_mqtt_caps:get_caps(external, publish),
max_clientid_len => ?MAX_CLIENTID_LEN,
max_topic_alias => 0,
max_topic_levels => 0,
max_qos_allowed => ?QOS_2,
mqtt_retain_available => true,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true
},
Caps2 = Caps#{max_packet_size => 1048576},
case emqx_mqtt_caps:get_caps(zone) of
Caps -> ok;
Caps2 -> ok
end,
PubCaps = #{
max_qos_allowed => ?QOS_2,
mqtt_retain_available => true,
max_topic_alias => 0
},
PubCaps = emqx_mqtt_caps:get_caps(zone, publish),
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1}, NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewPubCaps)],
timer:sleep(100), timer:sleep(100),
NewPubCaps = emqx_mqtt_caps:get_caps(zone, publish), NewPubCaps = emqx_mqtt_caps:get_caps(external, publish),
SubCaps = #{
max_topic_levels => 0, SubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
max_qos_allowed => ?QOS_2, NewSubCaps = SubCaps#{max_topic_levels => 2},
mqtt_shared_subscription => true, [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewSubCaps)],
mqtt_wildcard_subscription => true timer:sleep(100),
}, NewSubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe),
emqx_zone:stop(). emqx_zone:stop().
t_check_pub(_) -> t_check_pub(_) ->
@ -68,35 +49,34 @@ t_check_pub(_) ->
mqtt_retain_available => false, mqtt_retain_available => false,
max_topic_alias => 4 max_topic_alias => 4
}, },
emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(PubCaps)],
timer:sleep(100), timer:sleep(100),
ct:log("~p", [emqx_mqtt_caps:get_caps(zone, publish)]), ct:log("~p", [emqx_mqtt_caps:get_caps(external, publish)]),
BadPubProps1 = #{ BadPubProps1 = #{
qos => ?QOS_2, qos => ?QOS_2,
retain => false retain => false
}, },
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1), {error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps1),
BadPubProps2 = #{ BadPubProps2 = #{
qos => ?QOS_1, qos => ?QOS_1,
retain => true retain => true
}, },
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2), {error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps2),
BadPubProps3 = #{ BadPubProps3 = #{
qos => ?QOS_1, qos => ?QOS_1,
retain => false, retain => false,
topic_alias => 5 topic_alias => 5
}, },
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(zone, BadPubProps3), {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(external, BadPubProps3),
PubProps = #{ PubProps = #{
qos => ?QOS_1, qos => ?QOS_1,
retain => false retain => false
}, },
ok = emqx_mqtt_caps:check_pub(zone, PubProps), ok = emqx_mqtt_caps:check_pub(external, PubProps),
emqx_zone:stop(). emqx_zone:stop().
t_check_sub(_) -> t_check_sub(_) ->
{ok, _} = emqx_zone:start_link(), {ok, _} = emqx_zone:start_link(),
Opts = #{qos => ?QOS_2, share => true, rc => 0}, Opts = #{qos => ?QOS_2, share => true, rc => 0},
Caps = #{ Caps = #{
max_topic_levels => 0, max_topic_levels => 0,
@ -104,6 +84,8 @@ t_check_sub(_) ->
mqtt_shared_subscription => true, mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true mqtt_wildcard_subscription => true
}, },
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
timer:sleep(100),
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]), ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]), ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]),
@ -122,10 +104,10 @@ t_check_sub(_) ->
do_check_sub(TopicFilters, Topics) -> do_check_sub(TopicFilters, Topics) ->
{ok, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters), {ok, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
ok. ok.
do_check_sub(Caps, TopicFilters, Topics) -> do_check_sub(Caps, TopicFilters, Topics) ->
emqx_zone:set_env(zone, '$mqtt_sub_caps', Caps), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
timer:sleep(100), timer:sleep(100),
{_, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters), {_, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
ok. ok.

View File

@ -56,7 +56,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([], fun set_special_configs/1), emqx_ct_helpers:start_apps([], fun set_special_configs/1),
MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()), MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(MqttCaps#{max_topic_alias => 20})],
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->

View File

@ -89,7 +89,7 @@ message_expiry_interval_exipred(ClientA, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a and waiting for the message expired %% publish to t/a and waiting for the message expired
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
ct:sleep(1000), ct:sleep(2000),
%% resume the session for client-b %% resume the session for client-b
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),