Add APIs for subscription add / delete

This commit is contained in:
spring2maz 2019-02-17 13:59:53 +01:00 committed by Gilbert Wong
parent 599f5c8d4f
commit 786a6eb696
4 changed files with 131 additions and 36 deletions

View File

@ -18,7 +18,7 @@
-export_type([config/0, connection/0]).
-optional_callbacks([]).
-optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]).
%% map fields depend on implementation
-type config() :: map().
@ -26,6 +26,8 @@
-type conn_ref() :: term().
-type batch() :: emqx_protal:batch().
-type ack_ref() :: emqx_portal:ack_ref().
-type topic() :: emqx_topic:topic().
-type qos() :: emqx_mqtt_types:qos().
-include("logger.hrl").
@ -42,6 +44,10 @@
%% called when owner is shutting down.
-callback stop(conn_ref(), connection()) -> ok.
-callback ensure_subscribed(connection(), topic(), qos()) -> ok.
-callback ensure_unsubscribed(connection(), topic()) -> ok.
start(Module, Config) ->
case Module:start(Config) of
{ok, Ref, Conn} ->

View File

@ -74,7 +74,7 @@
%% management APIs
-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]).
-export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]).
-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]).
-export_type([config/0,
batch/0,
@ -142,17 +142,32 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) ->
-spec get_forwards(id()) -> [topic()].
get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)).
%% @doc Return all subscriptions (subscription over mqtt connection to remote broker).
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
%% @doc Add a new forward (local topic subscription).
-spec ensure_forward_present(id(), topic()) -> ok.
ensure_forward_present(Id, Topic) ->
gen_statem:call(id(Id), {ensure_forward_present, topic(Topic)}).
gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}).
%% @doc Ensure a forward topic is deleted.
-spec ensure_forward_absent(id(), topic()) -> ok.
ensure_forward_absent(Id, Topic) ->
gen_statem:call(id(Id), {ensure_forward_absent, topic(Topic)}).
gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}).
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
%% @doc Ensure subscribed to remote topic.
%% NOTE: only applicable when connection module is emqx_portal_mqtt
%% return `{error, no_remote_subscription_support}' otherwise.
-spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}.
ensure_subscription_present(Id, Topic, QoS) ->
gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}).
%% @doc Ensure unsubscribed from remote topic.
%% NOTE: only applicable when connection module is emqx_portal_mqtt
-spec ensure_subscription_absent(id(), topic()) -> ok.
ensure_subscription_absent(Id, Topic) ->
gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}).
callback_mode() -> [state_functions, state_enter].
@ -187,7 +202,7 @@ init(Config) ->
mountpoint,
forwards
], Config#{subscriptions => Subs}),
ConnectFun = fun() -> emqx_portal_connect:start(ConnectModule, ConnectConfig) end,
ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end,
{ok, connecting,
#{connect_module => ConnectModule,
connect_fun => ConnectFun,
@ -217,8 +232,10 @@ connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
Action = {state_timeout, Timeout, reconnect},
{keep_state_and_data, Action};
connecting(enter, connecting, #{reconnect_delay_ms := Timeout,
connect_fun := ConnectFun} = State) ->
case ConnectFun() of
connect_fun := ConnectFun,
subscriptions := Subs
} = State) ->
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
Action = {state_timeout, 0, connected},
{keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
@ -277,7 +294,7 @@ connected(info, {batch_ack, Ref}, State) ->
%% try re-connect then re-send
{next_state, connecting, disconnect(State)};
{ok, NewState} ->
{keep_state, NewState}
{keep_state, NewState, ?maybe_send}
end;
connected(Type, Content, State) ->
common(connected, Type, Content, State).
@ -285,28 +302,14 @@ connected(Type, Content, State) ->
%% Common handlers
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
{keep_state_and_data, [{reply, From, Forwards}]};
common(_StateName, {call, From}, {ensure_forward_present, Topic},
#{forwards := Forwards} = State) ->
case lists:member(Topic, Forwards) of
true ->
{keep_state_and_data, [{reply, From, ok}]};
false ->
ok = subscribe_local_topic(Topic),
{keep_state, State#{forwards := lists:usort([Topic | Forwards])},
[{reply, From, ok}]}
end;
common(_StateName, {call, From}, {ensure_forward_absent, Topic},
#{forwards := Forwards} = State) ->
case lists:member(Topic, Forwards) of
true ->
emqx_broker:unsubscribe(Topic),
{keep_state, State#{forwards := lists:delete(Topic, Forwards)},
[{reply, From, ok}]};
false ->
{keep_state_and_data, [{reply, From, ok}]}
end;
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
{keep_state_and_data, [{reply, From, Subs}]};
common(_StateName, {call, From}, {ensure_present, What, Topic}, State) ->
{Result, NewState} = ensure_present(What, Topic, State),
{keep_state, NewState, [{reply, From, Result}]};
common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
{Result, NewState} = ensure_absent(What, Topic, State),
{keep_state, NewState, [{reply, From, Result}]};
common(_StateName, info, {dispatch, _, Msg},
#{replayq := Q} = State) ->
NewQ = replayq:append(Q, collect([Msg])),
@ -316,6 +319,53 @@ common(StateName, Type, Content, State) ->
[name(), Type, StateName, Content]),
{keep_state, State}.
ensure_present(Key, Topic, State) ->
Topics = maps:get(Key, State),
case is_topic_present(Topic, Topics) of
true ->
{ok, State};
false ->
R = do_ensure_present(Key, Topic, State),
{R, State#{Key := lists:usort([Topic | Topics])}}
end.
ensure_absent(Key, Topic, State) ->
Topics = maps:get(Key, State),
case is_topic_present(Topic, Topics) of
true ->
R = do_ensure_absent(Key, Topic, State),
{R, State#{Key := ensure_topic_absent(Topic, Topics)}};
false ->
{ok, State}
end.
ensure_topic_absent(_Topic, []) -> [];
ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L);
ensure_topic_absent(Topic, L) -> lists:delete(Topic, L).
is_topic_present({Topic, _QoS}, Topics) ->
is_topic_present(Topic, Topics);
is_topic_present(Topic, Topics) ->
lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
do_ensure_present(forwards, Topic, _) ->
ok = subscribe_local_topic(Topic);
do_ensure_present(subscriptions, {Topic, QoS},
#{connect_module := ConnectModule, connection := Conn}) ->
case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of
true -> ConnectModule:ensure_subscribed(Conn, Topic, QoS);
false -> {error, no_remote_subscription_support}
end.
do_ensure_absent(forwards, Topic, _) ->
ok = emqx_broker:unsubscribe(Topic);
do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule,
connection := Conn}) ->
case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of
true -> ConnectModule:ensure_unsubscribed(Conn, Topic);
false -> {error, no_remote_subscription_support}
end.
collect(Acc) ->
receive
{dispatch, _, Msg} ->

View File

@ -23,6 +23,11 @@
stop/2
]).
%% optional behaviour callbacks
-export([ensure_subscribed/3,
ensure_unsubscribed/2
]).
-include("emqx_mqtt.hrl").
-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).
@ -66,6 +71,18 @@ stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) ->
safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000),
ok.
ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
emqx_client:subscribe(Pid, Topic, QoS);
ensure_subscribed(_Conn, _Topic, _QoS) ->
%% return ok for now, next re-connect should should call start with new topic added to config
ok.
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
emqx_client:unsubscribe(Pid, Topic);
ensure_unsubscribed(_, _) ->
%% return ok for now, next re-connect should should call start with this topic deleted from config
ok.
safe_stop(Pid, StopF, Timeout) ->
MRef = monitor(process, Pid),
unlink(Pid),

View File

@ -17,7 +17,7 @@
-export([all/0, init_per_suite/1, end_per_suite/1]).
-export([t_rpc/1,
t_mqtt/1,
t_forwards_mngr/1
t_mngr/1
]).
-include_lib("eunit/include/eunit.hrl").
@ -29,7 +29,7 @@
all() -> [t_rpc,
t_mqtt,
t_forwards_mngr
t_mngr
].
init_per_suite(Config) ->
@ -44,7 +44,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_forwards_mngr(Config) when is_list(Config) ->
t_mngr(Config) when is_list(Config) ->
Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
Cfg = #{address => node(),
forwards => [<<"mngr">>],
@ -62,6 +62,10 @@ t_forwards_mngr(Config) when is_list(Config) ->
?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr2")),
?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr3")),
?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Pid)),
?assertEqual({error, no_remote_subscription_support},
emqx_portal:ensure_subscription_present(Pid, <<"t">>, 0)),
?assertEqual({error, no_remote_subscription_support},
emqx_portal:ensure_subscription_absent(Pid, <<"t">>)),
?assertEqual(Subs, emqx_portal:get_subscriptions(Pid))
after
ok = emqx_portal:stop(Pid)
@ -93,10 +97,16 @@ t_rpc(Config) when is_list(Config) ->
ok = emqx_portal:stop(Pid)
end.
%% Full data loopback flow explained:
%% test-pid ---> mock-cleint ----> local-broker ---(local-subscription)--->
%% portal(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) -->
%% portal(import) --(mecked message sending)--> test-pid
t_mqtt(Config) when is_list(Config) ->
SendToTopic = <<"t_mqtt/one">>,
SendToTopic2 = <<"t_mqtt/two">>,
Mountpoint = <<"forwarded/${node}/">>,
ForwardedTopic = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic]),
ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]),
Cfg = #{address => "127.0.0.1:1883",
forwards => [SendToTopic],
connect_module => emqx_portal_mqtt,
@ -118,7 +128,7 @@ t_mqtt(Config) when is_list(Config) ->
start_type => manual,
%% Consume back to forwarded message for verification
%% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2
subscriptions => [{ForwardedTopic, 1}]
subscriptions => [{ForwardedTopic, _QoS = 1}]
},
Tester = self(),
Ref = make_ref(),
@ -131,15 +141,27 @@ t_mqtt(Config) when is_list(Config) ->
{ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"client-1">>,
try
?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)),
emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1),
?assertEqual([{ForwardedTopic, 1},
{ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)),
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
%% message from a different client, to avoid getting terminated by no-local
Msgs = lists:seq(1, 10),
Max = 100,
Msgs = lists:seq(1, Max),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs),
ok = receive_and_match_messages(Ref, Msgs),
ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2),
Msgs2 = lists:seq(Max + 1, Max * 2),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs2),
ok = receive_and_match_messages(Ref, Msgs2),
emqx_mock_client:close_session(ConnPid)
after
ok = emqx_portal:stop(Pid),
@ -147,7 +169,7 @@ t_mqtt(Config) when is_list(Config) ->
end.
receive_and_match_messages(Ref, Msgs) ->
TRef = erlang:send_after(timer:seconds(4), self(), {Ref, timeout}),
TRef = erlang:send_after(timer:seconds(5), self(), {Ref, timeout}),
try
do_receive_and_match_messages(Ref, Msgs)
after