diff --git a/src/emqx_portal_connect.erl b/src/emqx_portal_connect.erl index ab3a3f5b8..79ec077e8 100644 --- a/src/emqx_portal_connect.erl +++ b/src/emqx_portal_connect.erl @@ -18,7 +18,7 @@ -export_type([config/0, connection/0]). --optional_callbacks([]). +-optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]). %% map fields depend on implementation -type config() :: map(). @@ -26,6 +26,8 @@ -type conn_ref() :: term(). -type batch() :: emqx_protal:batch(). -type ack_ref() :: emqx_portal:ack_ref(). +-type topic() :: emqx_topic:topic(). +-type qos() :: emqx_mqtt_types:qos(). -include("logger.hrl"). @@ -42,6 +44,10 @@ %% called when owner is shutting down. -callback stop(conn_ref(), connection()) -> ok. +-callback ensure_subscribed(connection(), topic(), qos()) -> ok. + +-callback ensure_unsubscribed(connection(), topic()) -> ok. + start(Module, Config) -> case Module:start(Config) of {ok, Ref, Conn} -> diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl index 508fb81a9..eb5f86754 100644 --- a/src/portal/emqx_portal.erl +++ b/src/portal/emqx_portal.erl @@ -74,7 +74,7 @@ %% management APIs -export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). --export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]). +-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). -export_type([config/0, batch/0, @@ -142,17 +142,32 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) -> -spec get_forwards(id()) -> [topic()]. get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)). +%% @doc Return all subscriptions (subscription over mqtt connection to remote broker). +-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. +get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions). + %% @doc Add a new forward (local topic subscription). -spec ensure_forward_present(id(), topic()) -> ok. ensure_forward_present(Id, Topic) -> - gen_statem:call(id(Id), {ensure_forward_present, topic(Topic)}). + gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}). +%% @doc Ensure a forward topic is deleted. -spec ensure_forward_absent(id(), topic()) -> ok. ensure_forward_absent(Id, Topic) -> - gen_statem:call(id(Id), {ensure_forward_absent, topic(Topic)}). + gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}). --spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. -get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions). +%% @doc Ensure subscribed to remote topic. +%% NOTE: only applicable when connection module is emqx_portal_mqtt +%% return `{error, no_remote_subscription_support}' otherwise. +-spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}. +ensure_subscription_present(Id, Topic, QoS) -> + gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}). + +%% @doc Ensure unsubscribed from remote topic. +%% NOTE: only applicable when connection module is emqx_portal_mqtt +-spec ensure_subscription_absent(id(), topic()) -> ok. +ensure_subscription_absent(Id, Topic) -> + gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}). callback_mode() -> [state_functions, state_enter]. @@ -187,7 +202,7 @@ init(Config) -> mountpoint, forwards ], Config#{subscriptions => Subs}), - ConnectFun = fun() -> emqx_portal_connect:start(ConnectModule, ConnectConfig) end, + ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end, {ok, connecting, #{connect_module => ConnectModule, connect_fun => ConnectFun, @@ -217,8 +232,10 @@ connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action}; connecting(enter, connecting, #{reconnect_delay_ms := Timeout, - connect_fun := ConnectFun} = State) -> - case ConnectFun() of + connect_fun := ConnectFun, + subscriptions := Subs + } = State) -> + case ConnectFun(Subs) of {ok, ConnRef, Conn} -> Action = {state_timeout, 0, connected}, {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; @@ -277,7 +294,7 @@ connected(info, {batch_ack, Ref}, State) -> %% try re-connect then re-send {next_state, connecting, disconnect(State)}; {ok, NewState} -> - {keep_state, NewState} + {keep_state, NewState, ?maybe_send} end; connected(Type, Content, State) -> common(connected, Type, Content, State). @@ -285,28 +302,14 @@ connected(Type, Content, State) -> %% Common handlers common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> {keep_state_and_data, [{reply, From, Forwards}]}; -common(_StateName, {call, From}, {ensure_forward_present, Topic}, - #{forwards := Forwards} = State) -> - case lists:member(Topic, Forwards) of - true -> - {keep_state_and_data, [{reply, From, ok}]}; - false -> - ok = subscribe_local_topic(Topic), - {keep_state, State#{forwards := lists:usort([Topic | Forwards])}, - [{reply, From, ok}]} - end; -common(_StateName, {call, From}, {ensure_forward_absent, Topic}, - #{forwards := Forwards} = State) -> - case lists:member(Topic, Forwards) of - true -> - emqx_broker:unsubscribe(Topic), - {keep_state, State#{forwards := lists:delete(Topic, Forwards)}, - [{reply, From, ok}]}; - false -> - {keep_state_and_data, [{reply, From, ok}]} - end; common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) -> {keep_state_and_data, [{reply, From, Subs}]}; +common(_StateName, {call, From}, {ensure_present, What, Topic}, State) -> + {Result, NewState} = ensure_present(What, Topic, State), + {keep_state, NewState, [{reply, From, Result}]}; +common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> + {Result, NewState} = ensure_absent(What, Topic, State), + {keep_state, NewState, [{reply, From, Result}]}; common(_StateName, info, {dispatch, _, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, collect([Msg])), @@ -316,6 +319,53 @@ common(StateName, Type, Content, State) -> [name(), Type, StateName, Content]), {keep_state, State}. +ensure_present(Key, Topic, State) -> + Topics = maps:get(Key, State), + case is_topic_present(Topic, Topics) of + true -> + {ok, State}; + false -> + R = do_ensure_present(Key, Topic, State), + {R, State#{Key := lists:usort([Topic | Topics])}} + end. + +ensure_absent(Key, Topic, State) -> + Topics = maps:get(Key, State), + case is_topic_present(Topic, Topics) of + true -> + R = do_ensure_absent(Key, Topic, State), + {R, State#{Key := ensure_topic_absent(Topic, Topics)}}; + false -> + {ok, State} + end. + +ensure_topic_absent(_Topic, []) -> []; +ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L); +ensure_topic_absent(Topic, L) -> lists:delete(Topic, L). + +is_topic_present({Topic, _QoS}, Topics) -> + is_topic_present(Topic, Topics); +is_topic_present(Topic, Topics) -> + lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). + +do_ensure_present(forwards, Topic, _) -> + ok = subscribe_local_topic(Topic); +do_ensure_present(subscriptions, {Topic, QoS}, + #{connect_module := ConnectModule, connection := Conn}) -> + case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of + true -> ConnectModule:ensure_subscribed(Conn, Topic, QoS); + false -> {error, no_remote_subscription_support} + end. + +do_ensure_absent(forwards, Topic, _) -> + ok = emqx_broker:unsubscribe(Topic); +do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule, + connection := Conn}) -> + case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of + true -> ConnectModule:ensure_unsubscribed(Conn, Topic); + false -> {error, no_remote_subscription_support} + end. + collect(Acc) -> receive {dispatch, _, Msg} -> diff --git a/src/portal/emqx_portal_mqtt.erl b/src/portal/emqx_portal_mqtt.erl index ba7461943..0817e3fe2 100644 --- a/src/portal/emqx_portal_mqtt.erl +++ b/src/portal/emqx_portal_mqtt.erl @@ -23,6 +23,11 @@ stop/2 ]). +%% optional behaviour callbacks +-export([ensure_subscribed/3, + ensure_unsubscribed/2 + ]). + -include("emqx_mqtt.hrl"). -define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}). @@ -66,6 +71,18 @@ stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) -> safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000), ok. +ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> + emqx_client:subscribe(Pid, Topic, QoS); +ensure_subscribed(_Conn, _Topic, _QoS) -> + %% return ok for now, next re-connect should should call start with new topic added to config + ok. + +ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> + emqx_client:unsubscribe(Pid, Topic); +ensure_unsubscribed(_, _) -> + %% return ok for now, next re-connect should should call start with this topic deleted from config + ok. + safe_stop(Pid, StopF, Timeout) -> MRef = monitor(process, Pid), unlink(Pid), diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl index fb851c400..c375ee994 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_portal_SUITE.erl @@ -17,7 +17,7 @@ -export([all/0, init_per_suite/1, end_per_suite/1]). -export([t_rpc/1, t_mqtt/1, - t_forwards_mngr/1 + t_mngr/1 ]). -include_lib("eunit/include/eunit.hrl"). @@ -29,7 +29,7 @@ all() -> [t_rpc, t_mqtt, - t_forwards_mngr + t_mngr ]. init_per_suite(Config) -> @@ -44,7 +44,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -t_forwards_mngr(Config) when is_list(Config) -> +t_mngr(Config) when is_list(Config) -> Subs = [{<<"a">>, 1}, {<<"b">>, 2}], Cfg = #{address => node(), forwards => [<<"mngr">>], @@ -62,6 +62,10 @@ t_forwards_mngr(Config) when is_list(Config) -> ?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr2")), ?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr3")), ?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Pid)), + ?assertEqual({error, no_remote_subscription_support}, + emqx_portal:ensure_subscription_present(Pid, <<"t">>, 0)), + ?assertEqual({error, no_remote_subscription_support}, + emqx_portal:ensure_subscription_absent(Pid, <<"t">>)), ?assertEqual(Subs, emqx_portal:get_subscriptions(Pid)) after ok = emqx_portal:stop(Pid) @@ -93,10 +97,16 @@ t_rpc(Config) when is_list(Config) -> ok = emqx_portal:stop(Pid) end. +%% Full data loopback flow explained: +%% test-pid ---> mock-cleint ----> local-broker ---(local-subscription)---> +%% portal(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) --> +%% portal(import) --(mecked message sending)--> test-pid t_mqtt(Config) when is_list(Config) -> SendToTopic = <<"t_mqtt/one">>, + SendToTopic2 = <<"t_mqtt/two">>, Mountpoint = <<"forwarded/${node}/">>, ForwardedTopic = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic]), + ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]), Cfg = #{address => "127.0.0.1:1883", forwards => [SendToTopic], connect_module => emqx_portal_mqtt, @@ -118,7 +128,7 @@ t_mqtt(Config) when is_list(Config) -> start_type => manual, %% Consume back to forwarded message for verification %% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2 - subscriptions => [{ForwardedTopic, 1}] + subscriptions => [{ForwardedTopic, _QoS = 1}] }, Tester = self(), Ref = make_ref(), @@ -131,15 +141,27 @@ t_mqtt(Config) when is_list(Config) -> {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg), ClientId = <<"client-1">>, try + ?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)), + emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1), + ?assertEqual([{ForwardedTopic, 1}, + {ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)), {ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), %% message from a different client, to avoid getting terminated by no-local - Msgs = lists:seq(1, 10), + Max = 100, + Msgs = lists:seq(1, Max), lists:foreach(fun(I) -> Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)), emqx_session:publish(SPid, I, Msg) end, Msgs), ok = receive_and_match_messages(Ref, Msgs), + ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2), + Msgs2 = lists:seq(Max + 1, Max * 2), + lists:foreach(fun(I) -> + Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)), + emqx_session:publish(SPid, I, Msg) + end, Msgs2), + ok = receive_and_match_messages(Ref, Msgs2), emqx_mock_client:close_session(ConnPid) after ok = emqx_portal:stop(Pid), @@ -147,7 +169,7 @@ t_mqtt(Config) when is_list(Config) -> end. receive_and_match_messages(Ref, Msgs) -> - TRef = erlang:send_after(timer:seconds(4), self(), {Ref, timeout}), + TRef = erlang:send_after(timer:seconds(5), self(), {Ref, timeout}), try do_receive_and_match_messages(Ref, Msgs) after