From 9dbc34c376e159963a66d3d1edec85d74713b7e9 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 26 Feb 2019 08:45:15 +0100 Subject: [PATCH] Ack replayq and allow retry in tests --- src/emqx_client.erl | 23 ++++++++--- src/portal/emqx_portal.erl | 73 +++++++++++++++------------------ src/portal/emqx_portal_mqtt.erl | 6 +-- src/portal/emqx_portal_sup.erl | 14 ++++++- test/emqx_portal_SUITE.erl | 17 ++++---- test/emqx_portal_tests.erl | 3 +- 6 files changed, 77 insertions(+), 59 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 987cf52d3..2b551be56 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -957,8 +957,7 @@ connected(cast, ?PACKET(?PINGRESP), State) -> end; connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> - ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}), - {stop, disconnected, State}; + {stop, {disconnected, ReasonCode, Properties}, State}; connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) -> case send(?PACKET(?PINGREQ), State) of @@ -1042,10 +1041,18 @@ handle_event(EventType, EventContent, StateName, StateData) -> {keep_state, StateData}. %% Mandatory callback functions -terminate(_Reason, _State, #state{socket = undefined}) -> - ok; -terminate(_Reason, _State, #state{socket = Socket}) -> - emqx_client_sock:close(Socket). +terminate(Reason, _StateName, State = #state{socket = Socket}) -> + case Reason of + {disconnected, ReasonCode, Properties} -> + %% backward compatible + ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}); + _ -> + ok = eval_msg_handler(State, disconnected, Reason) + end, + case Socket =:= undefined of + true -> ok; + _ -> emqx_client_sock:close(Socket) + end. code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. @@ -1276,6 +1283,10 @@ eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, %% Special handling for disconnected message when there is no handler callback Owner ! {disconnected, ReasonCode, Properties}, ok; +eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER}, + disconnected, _OtherReason) -> + %% do nothing to be backward compatible + ok; eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, owner = Owner}, Kind, Msg) -> Owner ! {Kind, Msg}, diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl index ff481f603..83a29521d 100644 --- a/src/portal/emqx_portal.erl +++ b/src/portal/emqx_portal.erl @@ -64,8 +64,7 @@ -export([start_link/2, import_batch/2, handle_ack/2, - stop/1 - ]). + stop/1]). %% gen_statem callbacks -export([terminate/3, code_change/4, init/1, callback_mode/0]). @@ -74,15 +73,13 @@ -export([standing_by/3, connecting/3, connected/3]). %% management APIs --export([start_bridge/1, stop_bridge/1, status/1]). --export([ensure_started/2, ensure_stopped/1, ensure_stopped/2]). +-export([ensure_started/1, ensure_started/2, ensure_stopped/1, ensure_stopped/2, status/1]). -export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). -export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). -export_type([config/0, batch/0, - ack_ref/0 - ]). + ack_ref/0]). -type id() :: atom() | string() | pid(). -type qos() :: emqx_mqtt_types:qos(). @@ -112,7 +109,7 @@ %% max_inflight_batches: Max number of batches allowed to send-ahead before %% receiving confirmation from remote node/cluster %% mountpoint: The topic mount point for messages sent to remote node/cluster -%% `undefined', `<<>>' or `""' to disalble +%% `undefined', `<<>>' or `""' to disable %% forwards: Local topics to subscribe. %% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each %% send call towards emqx_portal_connect @@ -129,6 +126,9 @@ start_link(Name, Config) -> gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []). %% @doc Manually start portal worker. State idempotency ensured. +ensure_started(Name) -> + gen_statem:call(name(Name), ensure_started). + ensure_started(Name, Config) -> case start_link(Name, Config) of {ok, Pid} -> {ok, Pid}; @@ -162,12 +162,6 @@ ensure_stopped(Id, Timeout) -> stop(Pid) -> gen_statem:stop(Pid). -start_bridge(Name) -> - gen_statem:call(name(Name), ensure_started). - -stop_bridge(Name) -> - gen_statem:call(name(Name), ensure_stopped). - status(Pid) -> gen_statem:call(Pid, status). @@ -279,15 +273,11 @@ standing_by(enter, _, #{start_type := manual}) -> keep_state_and_data; standing_by({call, From}, ensure_started, State) -> {next_state, connecting, State, - [{reply, From, <<"starting bridge ......">>}]}; -standing_by({call, From}, ensure_stopped, _State) -> - {keep_state_and_data, [{reply, From, <<"bridge not started">>}]}; -standing_by({call, From}, status, _State) -> - {keep_state_and_data, [{reply, From, <<"Stopped">>}]}; + [{reply, From, ok}]}; standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by({call, From}, _Call, _State) -> - {keep_state_and_data, [{reply, From, {error, standing_by}}]}; + {keep_state_and_data, [{reply, From, {error,standing_by}}]}; standing_by(info, Info, State) -> ?INFO("Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]), {keep_state_and_data, State}; @@ -308,6 +298,7 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, ok = subscribe_local_topics(Forwards), case ConnectFun(Subs) of {ok, ConnRef, Conn} -> + ?INFO("Portal ~p connected", [name()]), Action = {state_timeout, 0, connected}, {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; error -> @@ -318,10 +309,6 @@ connecting(state_timeout, connected, State) -> {next_state, connected, State}; connecting(state_timeout, reconnect, _State) -> repeat_state_and_data; -connecting({call, From}, status, _State) -> - {keep_state_and_data, [{reply, From, <<"Stopped">>}]}; -connecting({call, From}, _Call, _State) -> - {keep_state_and_data, [{reply, From, <<"starting bridge ......">>}]}; connecting(info, {batch_ack, Ref}, State) -> case do_ack(State, Ref) of {ok, NewState} -> @@ -329,6 +316,10 @@ connecting(info, {batch_ack, Ref}, State) -> _ -> keep_state_and_data end; +connecting(internal, maybe_send, _State) -> + keep_state_and_data; +connecting(info, {disconnected, _Ref, _Reason}, _State) -> + keep_state_and_data; connecting(Type, Content, State) -> common(connecting, Type, Content, State). @@ -353,23 +344,23 @@ connected(internal, maybe_send, State) -> {error, NewState} -> {next_state, connecting, disconnect(NewState)} end; -connected({call, From}, ensure_started, _State) -> - {keep_state_and_data, [{reply, From, <<"bridge already started">>}]}; -connected({call, From}, status, _State) -> - {keep_state_and_data, [{reply, From, <<"Running">>}]}; connected(info, {disconnected, ConnRef, Reason}, - #{conn_ref := ConnRef, connection := Conn} = State) -> - ?INFO("Portal ~p diconnected~nreason=~p", - [name(), Conn, Reason]), - {next_state, connecting, - State#{conn_ref := undefined, - connection := undefined}}; + #{conn_ref := ConnRefCurrent, connection := Conn} = State) -> + case ConnRefCurrent =:= ConnRef of + true -> + ?INFO("Portal ~p diconnected~nreason=~p", [name(), Conn, Reason]), + {next_state, connecting, + State#{conn_ref := undefined, connection := undefined}}; + false -> + keep_state_and_data + end; connected(info, {batch_ack, Ref}, State) -> case do_ack(State, Ref) of stale -> keep_state_and_data; bad_order -> %% try re-connect then re-send + ?ERROR("Bad order ack received by portal ~p", [name()]), {next_state, connecting, disconnect(State)}; {ok, NewState} -> {keep_state, NewState, ?maybe_send} @@ -378,6 +369,8 @@ connected(Type, Content, State) -> common(connected, Type, Content, State). %% Common handlers +common(StateName, {call, From}, status, _State) -> + {keep_state_and_data, [{reply, From, StateName}]}; common(_StateName, {call, From}, ensure_started, _State) -> {keep_state_and_data, [{reply, From, ok}]}; common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> @@ -392,13 +385,13 @@ common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> {keep_state, NewState, [{reply, From, Result}]}; common(_StateName, {call, From}, ensure_stopped, _State) -> {stop_and_reply, {shutdown, manual}, - [{reply, From, <<"stop bridge successfully">>}]}; + [{reply, From, ok}]}; common(_StateName, info, {dispatch, _, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, collect([Msg])), {keep_state, State#{replayq => NewQ}, ?maybe_send}; common(StateName, Type, Content, State) -> - ?INFO("Portal ~p discarded ~p type event at state ~p:~p", + ?INFO("Portal ~p discarded ~p type event at state ~p:\n~p", [name(), Type, StateName, Content]), {keep_state, State}. @@ -497,15 +490,16 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> %% this is a list of inflight BATCHes, not expecting it to be too long NewInflight = Inflight ++ [#{q_ack_ref => QAckRef, send_ack_ref => Ref, - batch => Batch - }], + batch => Batch}], {ok, State#{inflight := NewInflight}}; {error, Reason} -> ?INFO("Batch produce failed\n~p", [Reason]), {error, State} end. -do_ack(State = #{inflight := [#{send_ack_ref := Ref} | Rest]}, Ref) -> +do_ack(State = #{inflight := [#{send_ack_ref := Refx, q_ack_ref := QAckRef} | Rest], + replayq := Q}, Ref) when Refx =:= Ref -> + ok = replayq:ack(Q, QAckRef), {ok, State#{inflight := Rest}}; do_ack(#{inflight := Inflight}, Ref) -> case lists:any(fun(#{send_ack_ref := Ref0}) -> Ref0 =:= Ref end, Inflight) of @@ -533,8 +527,7 @@ disconnect(#{connection := Conn, } = State) when Conn =/= undefined -> ok = Module:stop(ConnRef, Conn), State#{conn_ref => undefined, - connection => undefined - }; + connection => undefined}; disconnect(State) -> State. %% Called only when replayq needs to dump it to disk. diff --git a/src/portal/emqx_portal_mqtt.erl b/src/portal/emqx_portal_mqtt.erl index f47a0cf5a..d466a970d 100644 --- a/src/portal/emqx_portal_mqtt.erl +++ b/src/portal/emqx_portal_mqtt.erl @@ -74,8 +74,8 @@ start(Config = #{address := Address}) -> end. stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) -> - safe_stop(AckCollector, fun() -> AckCollector ! ?STOP(Ref) end, 1000), safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000), + safe_stop(AckCollector, fun() -> AckCollector ! ?STOP(Ref) end, 1000), ok. ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> @@ -120,7 +120,7 @@ send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Re {ok, PktId} -> send(Conn, Rest, [PktId | Acc]); {error, {_PacketId, inflight_full}} -> - timer:sleep(100), + timer:sleep(10), send(Conn, Batch, Acc); {error, Reason} -> %% NOTE: There is no partial sucess of a batch and recover from the middle @@ -176,7 +176,7 @@ import_msg(Msg) -> make_hdlr(Parent, AckCollector, Ref) -> #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end, publish => fun(Msg) -> import_msg(Msg) end, - disconnected => fun(RC, _Properties) -> Parent ! {disconnected, Ref, RC}, ok end + disconnected => fun(Reason) -> Parent ! {disconnected, Ref, Reason}, ok end }. subscribe_remote_topics(ClientPid, Subscriptions) -> diff --git a/src/portal/emqx_portal_sup.erl b/src/portal/emqx_portal_sup.erl index fbc292c4e..845136b7a 100644 --- a/src/portal/emqx_portal_sup.erl +++ b/src/portal/emqx_portal_sup.erl @@ -16,7 +16,7 @@ -behavior(supervisor). -export([start_link/0, start_link/1, portals/0]). - +-export([create_portal/2, drop_portal/1]). -export([init/1]). -define(SUP, ?MODULE). @@ -46,3 +46,15 @@ portal_spec({Name, Config}) -> -spec(portals() -> [{node(), map()}]). portals() -> [{Name, emqx_portal:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. + +create_portal(Id, Config) -> + supervisor:start_child(?SUP, portal_spec({Id, Config})). + +drop_portal(Id) -> + case supervisor:terminate_child(?SUP, Id) of + ok -> + supervisor:delete_child(?SUP, Id); + Error -> + emqx_logger:error("[Bridge] Delete bridge failed", [Error]), + Error + end. diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl index 3d34eda50..d9e7b38a2 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_portal_SUITE.erl @@ -146,8 +146,6 @@ t_mqtt(Config) when is_list(Config) -> ?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)), ok = emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1), ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2), - %% TODO: investigate why it's necessary - timer:sleep(1000), ?assertEqual([{ForwardedTopic, 1}, {ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)), {ok, ConnPid} = emqx_mock_client:start_link(ClientId), @@ -182,13 +180,16 @@ receive_and_match_messages(Ref, Msgs) -> ok. do_receive_and_match_messages(_Ref, []) -> ok; -do_receive_and_match_messages(Ref, [I | Rest]) -> +do_receive_and_match_messages(Ref, [I | Rest] = Exp) -> receive {Ref, timeout} -> erlang:error(timeout); {Ref, [#{payload := P} = Msg]} -> - case I =:= binary_to_integer(P) of - true -> ok; - false -> throw({unexpected, Msg, [I | Rest]}) - end, - do_receive_and_match_messages(Ref, Rest) + case binary_to_integer(P) of + I -> %% exact match + do_receive_and_match_messages(Ref, Rest); + J when J < I -> %% allow retry + do_receive_and_match_messages(Ref, Exp); + _Other -> + throw({unexpected, Msg, Exp}) + end end. diff --git a/test/emqx_portal_tests.erl b/test/emqx_portal_tests.erl index e9a3583fe..18d7ed4cd 100644 --- a/test/emqx_portal_tests.erl +++ b/test/emqx_portal_tests.erl @@ -139,7 +139,8 @@ match_nums([#message{payload = P} | Rest], Nums) -> I = binary_to_integer(P), case Nums of [I | NumsLeft] -> match_nums(Rest, NumsLeft); - _ -> error({I, Nums}) + [J | _] when J > I -> match_nums(Rest, Nums); %% allow retry + _ -> error([{received, I}, {expecting, Nums}]) end. make_config(Ref, TestPid, Result) ->