chore: add test cases for MQTT Bridge reconnecting

This commit is contained in:
Shawn 2022-09-15 08:57:17 +08:00
parent 4e211c12d3
commit d5d3972ff5
4 changed files with 121 additions and 4 deletions

View File

@ -481,5 +481,121 @@ t_egress_mqtt_bridge_with_rules(_) ->
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []). {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []).
t_mqtt_conn_bridge_egress_reconnect(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF(User1)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF,
%% to make it reconnect quickly
<<"reconnect_interval">> => <<"1s">>,
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"enable_queue">> => true,
<<"query_mode">> => <<"sync">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
}
}
),
#{
<<"type">> := ?TYPE_MQTT,
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
assert_mqtt_msg_received(RemoteTopic, Payload),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> :=
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
}
]
},
jsx:decode(BridgeStr)
),
%% stop the listener 1883 to make the bridge disconnected
ok = emqx_listeners:stop_listener('tcp:default'),
%% PUBLISH 2 messages to the 'local' broker, the message should
emqx:publish(emqx_message:make(LocalTopic, Payload)),
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% verify the metrics of the bridge, the message should be queued
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"status">> := Status,
<<"metrics">> := #{
<<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
}
} when Status == <<"connected">> orelse Status == <<"connecting">>,
jsx:decode(BridgeStr1)
),
%% start the listener 1883 to make the bridge reconnected
ok = emqx_listeners:start_listener('tcp:default'),
timer:sleep(1500),
%% verify the metrics of the bridge, the 2 queued messages should have been sent
{ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"status">> := <<"connected">>,
<<"metrics">> := #{
<<"matched">> := 3,
<<"success">> := 3,
<<"failed">> := 0,
<<"queuing">> := 0,
<<"retried">> := R
}
} when R > 0,
jsx:decode(BridgeStr2)
),
%% also verify the 2 messages have been sent to the remote broker
assert_mqtt_msg_received(RemoteTopic, Payload),
assert_mqtt_msg_received(RemoteTopic, Payload),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
assert_mqtt_msg_received(Topic, Payload) ->
?assert(
receive
{deliver, Topic, #message{payload = Payload}} ->
ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
true;
Msg ->
ct:pal("Unexpected Msg: ~p", [Msg]),
false
after 100 ->
false
end
).
request(Method, Url, Body) -> request(Method, Url, Body) ->
request(<<"connector_admin">>, Method, Url, Body). request(<<"connector_admin">>, Method, Url, Body).

View File

@ -276,7 +276,7 @@ idle({call, From}, ensure_started, State) ->
{error, Reason, _State} -> {error, Reason, _State} ->
{keep_state_and_data, [{reply, From, {error, Reason}}]} {keep_state_and_data, [{reply, From, {error, Reason}}]}
end; end;
idle({call, From}, send_to_remote, _State) -> idle({call, From}, {send_to_remote, _}, _State) ->
{keep_state_and_data, [{reply, From, {error, {recoverable_error, not_connected}}}]}; {keep_state_and_data, [{reply, From, {error, {recoverable_error, not_connected}}}]};
%% @doc Standing by for manual start. %% @doc Standing by for manual start.
idle(info, idle, #{start_type := manual}) -> idle(info, idle, #{start_type := manual}) ->
@ -342,7 +342,7 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
common(_StateName, {call, From}, Req, _State) -> common(_StateName, {call, From}, Req, _State) ->
{keep_state_and_data, [{reply, From, {error, {unsuppored_request, Req}}}]}; {keep_state_and_data, [{reply, From, {error, {unsupported_request, Req}}}]};
common(_StateName, info, {'EXIT', _, _}, State) -> common(_StateName, info, {'EXIT', _, _}, State) ->
{keep_state, State}; {keep_state, State};
common(StateName, Type, Content, #{name := Name} = State) -> common(StateName, Type, Content, #{name := Name} = State) ->

View File

@ -525,7 +525,7 @@ handle_connecting_health_check(Data) ->
(connected, UpdatedData) -> (connected, UpdatedData) ->
{next_state, connected, UpdatedData}; {next_state, connected, UpdatedData};
(connecting, UpdatedData) -> (connecting, UpdatedData) ->
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
{keep_state, UpdatedData, Actions}; {keep_state, UpdatedData, Actions};
(disconnected, UpdatedData) -> (disconnected, UpdatedData) ->
{next_state, disconnected, UpdatedData} {next_state, disconnected, UpdatedData}

View File

@ -133,6 +133,7 @@ init({Id, Index, Opts}) ->
end, end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
ok = inflight_new(Name), ok = inflight_new(Name),
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
St = #{ St = #{
id => Id, id => Id,
index => Index, index => Index,
@ -142,7 +143,7 @@ init({Id, Index, Opts}) ->
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue, queue => Queue,
resume_interval => maps:get(resume_interval, Opts, ?HEALTHCHECK_INTERVAL), resume_interval => maps:get(resume_interval, Opts, HCItvl),
acc => [], acc => [],
acc_left => BatchSize, acc_left => BatchSize,
tref => undefined tref => undefined