Ack replayq and allow retry in tests

This commit is contained in:
spring2maz 2019-02-26 08:45:15 +01:00 committed by Gilbert Wong
parent 75163e21f3
commit 9dbc34c376
6 changed files with 77 additions and 59 deletions

View File

@ -957,8 +957,7 @@ connected(cast, ?PACKET(?PINGRESP), State) ->
end; end;
connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) ->
ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}), {stop, {disconnected, ReasonCode, Properties}, State};
{stop, disconnected, State};
connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) -> connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) ->
case send(?PACKET(?PINGREQ), State) of case send(?PACKET(?PINGREQ), State) of
@ -1042,10 +1041,18 @@ handle_event(EventType, EventContent, StateName, StateData) ->
{keep_state, StateData}. {keep_state, StateData}.
%% Mandatory callback functions %% Mandatory callback functions
terminate(_Reason, _State, #state{socket = undefined}) -> terminate(Reason, _StateName, State = #state{socket = Socket}) ->
ok; case Reason of
terminate(_Reason, _State, #state{socket = Socket}) -> {disconnected, ReasonCode, Properties} ->
emqx_client_sock:close(Socket). %% backward compatible
ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties});
_ ->
ok = eval_msg_handler(State, disconnected, Reason)
end,
case Socket =:= undefined of
true -> ok;
_ -> emqx_client_sock:close(Socket)
end.
code_change(_Vsn, State, Data, _Extra) -> code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}. {ok, State, Data}.
@ -1276,6 +1283,10 @@ eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
%% Special handling for disconnected message when there is no handler callback %% Special handling for disconnected message when there is no handler callback
Owner ! {disconnected, ReasonCode, Properties}, Owner ! {disconnected, ReasonCode, Properties},
ok; ok;
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER},
disconnected, _OtherReason) ->
%% do nothing to be backward compatible
ok;
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
owner = Owner}, Kind, Msg) -> owner = Owner}, Kind, Msg) ->
Owner ! {Kind, Msg}, Owner ! {Kind, Msg},

View File

@ -64,8 +64,7 @@
-export([start_link/2, -export([start_link/2,
import_batch/2, import_batch/2,
handle_ack/2, handle_ack/2,
stop/1 stop/1]).
]).
%% gen_statem callbacks %% gen_statem callbacks
-export([terminate/3, code_change/4, init/1, callback_mode/0]). -export([terminate/3, code_change/4, init/1, callback_mode/0]).
@ -74,15 +73,13 @@
-export([standing_by/3, connecting/3, connected/3]). -export([standing_by/3, connecting/3, connected/3]).
%% management APIs %% management APIs
-export([start_bridge/1, stop_bridge/1, status/1]). -export([ensure_started/1, ensure_started/2, ensure_stopped/1, ensure_stopped/2, status/1]).
-export([ensure_started/2, ensure_stopped/1, ensure_stopped/2]).
-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). -export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]).
-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). -export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]).
-export_type([config/0, -export_type([config/0,
batch/0, batch/0,
ack_ref/0 ack_ref/0]).
]).
-type id() :: atom() | string() | pid(). -type id() :: atom() | string() | pid().
-type qos() :: emqx_mqtt_types:qos(). -type qos() :: emqx_mqtt_types:qos().
@ -112,7 +109,7 @@
%% max_inflight_batches: Max number of batches allowed to send-ahead before %% max_inflight_batches: Max number of batches allowed to send-ahead before
%% receiving confirmation from remote node/cluster %% receiving confirmation from remote node/cluster
%% mountpoint: The topic mount point for messages sent to remote node/cluster %% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disalble %% `undefined', `<<>>' or `""' to disable
%% forwards: Local topics to subscribe. %% forwards: Local topics to subscribe.
%% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each %% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each
%% send call towards emqx_portal_connect %% send call towards emqx_portal_connect
@ -129,6 +126,9 @@ start_link(Name, Config) ->
gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []). gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).
%% @doc Manually start portal worker. State idempotency ensured. %% @doc Manually start portal worker. State idempotency ensured.
ensure_started(Name) ->
gen_statem:call(name(Name), ensure_started).
ensure_started(Name, Config) -> ensure_started(Name, Config) ->
case start_link(Name, Config) of case start_link(Name, Config) of
{ok, Pid} -> {ok, Pid}; {ok, Pid} -> {ok, Pid};
@ -162,12 +162,6 @@ ensure_stopped(Id, Timeout) ->
stop(Pid) -> gen_statem:stop(Pid). stop(Pid) -> gen_statem:stop(Pid).
start_bridge(Name) ->
gen_statem:call(name(Name), ensure_started).
stop_bridge(Name) ->
gen_statem:call(name(Name), ensure_stopped).
status(Pid) -> status(Pid) ->
gen_statem:call(Pid, status). gen_statem:call(Pid, status).
@ -279,15 +273,11 @@ standing_by(enter, _, #{start_type := manual}) ->
keep_state_and_data; keep_state_and_data;
standing_by({call, From}, ensure_started, State) -> standing_by({call, From}, ensure_started, State) ->
{next_state, connecting, State, {next_state, connecting, State,
[{reply, From, <<"starting bridge ......">>}]}; [{reply, From, ok}]};
standing_by({call, From}, ensure_stopped, _State) ->
{keep_state_and_data, [{reply, From, <<"bridge not started">>}]};
standing_by({call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, <<"Stopped">>}]};
standing_by(state_timeout, do_connect, State) -> standing_by(state_timeout, do_connect, State) ->
{next_state, connecting, State}; {next_state, connecting, State};
standing_by({call, From}, _Call, _State) -> standing_by({call, From}, _Call, _State) ->
{keep_state_and_data, [{reply, From, {error, standing_by}}]}; {keep_state_and_data, [{reply, From, {error,standing_by}}]};
standing_by(info, Info, State) -> standing_by(info, Info, State) ->
?INFO("Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]), ?INFO("Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]),
{keep_state_and_data, State}; {keep_state_and_data, State};
@ -308,6 +298,7 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout,
ok = subscribe_local_topics(Forwards), ok = subscribe_local_topics(Forwards),
case ConnectFun(Subs) of case ConnectFun(Subs) of
{ok, ConnRef, Conn} -> {ok, ConnRef, Conn} ->
?INFO("Portal ~p connected", [name()]),
Action = {state_timeout, 0, connected}, Action = {state_timeout, 0, connected},
{keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
error -> error ->
@ -318,10 +309,6 @@ connecting(state_timeout, connected, State) ->
{next_state, connected, State}; {next_state, connected, State};
connecting(state_timeout, reconnect, _State) -> connecting(state_timeout, reconnect, _State) ->
repeat_state_and_data; repeat_state_and_data;
connecting({call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, <<"Stopped">>}]};
connecting({call, From}, _Call, _State) ->
{keep_state_and_data, [{reply, From, <<"starting bridge ......">>}]};
connecting(info, {batch_ack, Ref}, State) -> connecting(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of case do_ack(State, Ref) of
{ok, NewState} -> {ok, NewState} ->
@ -329,6 +316,10 @@ connecting(info, {batch_ack, Ref}, State) ->
_ -> _ ->
keep_state_and_data keep_state_and_data
end; end;
connecting(internal, maybe_send, _State) ->
keep_state_and_data;
connecting(info, {disconnected, _Ref, _Reason}, _State) ->
keep_state_and_data;
connecting(Type, Content, State) -> connecting(Type, Content, State) ->
common(connecting, Type, Content, State). common(connecting, Type, Content, State).
@ -353,23 +344,23 @@ connected(internal, maybe_send, State) ->
{error, NewState} -> {error, NewState} ->
{next_state, connecting, disconnect(NewState)} {next_state, connecting, disconnect(NewState)}
end; end;
connected({call, From}, ensure_started, _State) ->
{keep_state_and_data, [{reply, From, <<"bridge already started">>}]};
connected({call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, <<"Running">>}]};
connected(info, {disconnected, ConnRef, Reason}, connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRef, connection := Conn} = State) -> #{conn_ref := ConnRefCurrent, connection := Conn} = State) ->
?INFO("Portal ~p diconnected~nreason=~p", case ConnRefCurrent =:= ConnRef of
[name(), Conn, Reason]), true ->
{next_state, connecting, ?INFO("Portal ~p diconnected~nreason=~p", [name(), Conn, Reason]),
State#{conn_ref := undefined, {next_state, connecting,
connection := undefined}}; State#{conn_ref := undefined, connection := undefined}};
false ->
keep_state_and_data
end;
connected(info, {batch_ack, Ref}, State) -> connected(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of case do_ack(State, Ref) of
stale -> stale ->
keep_state_and_data; keep_state_and_data;
bad_order -> bad_order ->
%% try re-connect then re-send %% try re-connect then re-send
?ERROR("Bad order ack received by portal ~p", [name()]),
{next_state, connecting, disconnect(State)}; {next_state, connecting, disconnect(State)};
{ok, NewState} -> {ok, NewState} ->
{keep_state, NewState, ?maybe_send} {keep_state, NewState, ?maybe_send}
@ -378,6 +369,8 @@ connected(Type, Content, State) ->
common(connected, Type, Content, State). common(connected, Type, Content, State).
%% Common handlers %% Common handlers
common(StateName, {call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, StateName}]};
common(_StateName, {call, From}, ensure_started, _State) -> common(_StateName, {call, From}, ensure_started, _State) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
@ -392,13 +385,13 @@ common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
{keep_state, NewState, [{reply, From, Result}]}; {keep_state, NewState, [{reply, From, Result}]};
common(_StateName, {call, From}, ensure_stopped, _State) -> common(_StateName, {call, From}, ensure_stopped, _State) ->
{stop_and_reply, {shutdown, manual}, {stop_and_reply, {shutdown, manual},
[{reply, From, <<"stop bridge successfully">>}]}; [{reply, From, ok}]};
common(_StateName, info, {dispatch, _, Msg}, common(_StateName, info, {dispatch, _, Msg},
#{replayq := Q} = State) -> #{replayq := Q} = State) ->
NewQ = replayq:append(Q, collect([Msg])), NewQ = replayq:append(Q, collect([Msg])),
{keep_state, State#{replayq => NewQ}, ?maybe_send}; {keep_state, State#{replayq => NewQ}, ?maybe_send};
common(StateName, Type, Content, State) -> common(StateName, Type, Content, State) ->
?INFO("Portal ~p discarded ~p type event at state ~p:~p", ?INFO("Portal ~p discarded ~p type event at state ~p:\n~p",
[name(), Type, StateName, Content]), [name(), Type, StateName, Content]),
{keep_state, State}. {keep_state, State}.
@ -497,15 +490,16 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) ->
%% this is a list of inflight BATCHes, not expecting it to be too long %% this is a list of inflight BATCHes, not expecting it to be too long
NewInflight = Inflight ++ [#{q_ack_ref => QAckRef, NewInflight = Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => Ref, send_ack_ref => Ref,
batch => Batch batch => Batch}],
}],
{ok, State#{inflight := NewInflight}}; {ok, State#{inflight := NewInflight}};
{error, Reason} -> {error, Reason} ->
?INFO("Batch produce failed\n~p", [Reason]), ?INFO("Batch produce failed\n~p", [Reason]),
{error, State} {error, State}
end. end.
do_ack(State = #{inflight := [#{send_ack_ref := Ref} | Rest]}, Ref) -> do_ack(State = #{inflight := [#{send_ack_ref := Refx, q_ack_ref := QAckRef} | Rest],
replayq := Q}, Ref) when Refx =:= Ref ->
ok = replayq:ack(Q, QAckRef),
{ok, State#{inflight := Rest}}; {ok, State#{inflight := Rest}};
do_ack(#{inflight := Inflight}, Ref) -> do_ack(#{inflight := Inflight}, Ref) ->
case lists:any(fun(#{send_ack_ref := Ref0}) -> Ref0 =:= Ref end, Inflight) of case lists:any(fun(#{send_ack_ref := Ref0}) -> Ref0 =:= Ref end, Inflight) of
@ -533,8 +527,7 @@ disconnect(#{connection := Conn,
} = State) when Conn =/= undefined -> } = State) when Conn =/= undefined ->
ok = Module:stop(ConnRef, Conn), ok = Module:stop(ConnRef, Conn),
State#{conn_ref => undefined, State#{conn_ref => undefined,
connection => undefined connection => undefined};
};
disconnect(State) -> State. disconnect(State) -> State.
%% Called only when replayq needs to dump it to disk. %% Called only when replayq needs to dump it to disk.

View File

@ -74,8 +74,8 @@ start(Config = #{address := Address}) ->
end. end.
stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) -> stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) ->
safe_stop(AckCollector, fun() -> AckCollector ! ?STOP(Ref) end, 1000),
safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000), safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000),
safe_stop(AckCollector, fun() -> AckCollector ! ?STOP(Ref) end, 1000),
ok. ok.
ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
@ -120,7 +120,7 @@ send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Re
{ok, PktId} -> {ok, PktId} ->
send(Conn, Rest, [PktId | Acc]); send(Conn, Rest, [PktId | Acc]);
{error, {_PacketId, inflight_full}} -> {error, {_PacketId, inflight_full}} ->
timer:sleep(100), timer:sleep(10),
send(Conn, Batch, Acc); send(Conn, Batch, Acc);
{error, Reason} -> {error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle %% NOTE: There is no partial sucess of a batch and recover from the middle
@ -176,7 +176,7 @@ import_msg(Msg) ->
make_hdlr(Parent, AckCollector, Ref) -> make_hdlr(Parent, AckCollector, Ref) ->
#{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end, #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end,
publish => fun(Msg) -> import_msg(Msg) end, publish => fun(Msg) -> import_msg(Msg) end,
disconnected => fun(RC, _Properties) -> Parent ! {disconnected, Ref, RC}, ok end disconnected => fun(Reason) -> Parent ! {disconnected, Ref, Reason}, ok end
}. }.
subscribe_remote_topics(ClientPid, Subscriptions) -> subscribe_remote_topics(ClientPid, Subscriptions) ->

View File

@ -16,7 +16,7 @@
-behavior(supervisor). -behavior(supervisor).
-export([start_link/0, start_link/1, portals/0]). -export([start_link/0, start_link/1, portals/0]).
-export([create_portal/2, drop_portal/1]).
-export([init/1]). -export([init/1]).
-define(SUP, ?MODULE). -define(SUP, ?MODULE).
@ -46,3 +46,15 @@ portal_spec({Name, Config}) ->
-spec(portals() -> [{node(), map()}]). -spec(portals() -> [{node(), map()}]).
portals() -> portals() ->
[{Name, emqx_portal:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. [{Name, emqx_portal:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
create_portal(Id, Config) ->
supervisor:start_child(?SUP, portal_spec({Id, Config})).
drop_portal(Id) ->
case supervisor:terminate_child(?SUP, Id) of
ok ->
supervisor:delete_child(?SUP, Id);
Error ->
emqx_logger:error("[Bridge] Delete bridge failed", [Error]),
Error
end.

View File

@ -146,8 +146,6 @@ t_mqtt(Config) when is_list(Config) ->
?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)), ?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)),
ok = emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1), ok = emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1),
ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2), ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2),
%% TODO: investigate why it's necessary
timer:sleep(1000),
?assertEqual([{ForwardedTopic, 1}, ?assertEqual([{ForwardedTopic, 1},
{ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)), {ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)),
{ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
@ -182,13 +180,16 @@ receive_and_match_messages(Ref, Msgs) ->
ok. ok.
do_receive_and_match_messages(_Ref, []) -> ok; do_receive_and_match_messages(_Ref, []) -> ok;
do_receive_and_match_messages(Ref, [I | Rest]) -> do_receive_and_match_messages(Ref, [I | Rest] = Exp) ->
receive receive
{Ref, timeout} -> erlang:error(timeout); {Ref, timeout} -> erlang:error(timeout);
{Ref, [#{payload := P} = Msg]} -> {Ref, [#{payload := P} = Msg]} ->
case I =:= binary_to_integer(P) of case binary_to_integer(P) of
true -> ok; I -> %% exact match
false -> throw({unexpected, Msg, [I | Rest]}) do_receive_and_match_messages(Ref, Rest);
end, J when J < I -> %% allow retry
do_receive_and_match_messages(Ref, Rest) do_receive_and_match_messages(Ref, Exp);
_Other ->
throw({unexpected, Msg, Exp})
end
end. end.

View File

@ -139,7 +139,8 @@ match_nums([#message{payload = P} | Rest], Nums) ->
I = binary_to_integer(P), I = binary_to_integer(P),
case Nums of case Nums of
[I | NumsLeft] -> match_nums(Rest, NumsLeft); [I | NumsLeft] -> match_nums(Rest, NumsLeft);
_ -> error({I, Nums}) [J | _] when J > I -> match_nums(Rest, Nums); %% allow retry
_ -> error([{received, I}, {expecting, Nums}])
end. end.
make_config(Ref, TestPid, Result) -> make_config(Ref, TestPid, Result) ->