Add get_forwards and get_subscriptions protal APIs

This commit is contained in:
spring2maz 2019-02-15 17:06:02 +01:00 committed by Gilbert Wong
parent 67376727c9
commit 9e78c18681
4 changed files with 56 additions and 11 deletions

View File

@ -31,7 +31,7 @@
-type(pubsub() :: publish | subscribe).
-type(topic() :: binary()).
-type(subid() :: binary() | atom()).
-type(subopts() :: #{qos := integer(),
-type(subopts() :: #{qos := eqmx_mqtt_types:qos(),
share => binary(),
atom() => term()
}).

View File

@ -41,7 +41,7 @@
%% | | |
%% '--(1)---'--------(3)------'
%%
%% (1): timeout
%% (1): retry timeout
%% (2): successfuly connected to remote node/cluster
%% (3): received {disconnected, conn_ref(), Reason} OR
%% failed to send to remote node/cluster.
@ -72,11 +72,17 @@
%% state functions
-export([connecting/3, connected/3]).
%% management APIs
-export([get_forwards/1]). %, add_forward/2, del_forward/2]).
-export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]).
-export_type([config/0,
batch/0,
ack_ref/0
]).
-type id() :: atom() | string() | pid().
-type qos() :: emqx_mqtt_types:qos().
-type config() :: map().
-type batch() :: [emqx_portal_msg:exp_msg()].
-type ack_ref() :: term().
@ -131,6 +137,11 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) ->
Pid ! {batch_ack, Ref},
ok.
-spec get_forwards(id()) -> [emqx_topic:topic()].
get_forwards(Id) -> gen_statem:call(id(Id), get_forwards).
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
callback_mode() -> [state_functions, state_enter].
@ -150,7 +161,12 @@ init(Config) ->
end,
Queue = replayq:open(QueueConfig#{sizer => fun emqx_portal_msg:estimate_size/1,
marshaller => fun msg_marshaller/1}),
Topics = Get(forwards, []),
Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]),
Subs = lists:keysort(1, lists:map(fun({T0, QoS}) ->
T = iolist_to_binary(T0),
true = emqx_topic:validate({filter, T}),
{T, QoS}
end, Get(subscriptions, []))),
ok = subscribe_local_topics(Topics),
ConnectModule = maps:get(connect_module, Config),
ConnectConfig = maps:without([connect_module,
@ -159,7 +175,7 @@ init(Config) ->
max_inflight_batches,
mountpoint,
forwards
], Config),
], Config#{subscriptions => Subs}),
ConnectFun = fun() -> emqx_portal_connect:start(ConnectModule, ConnectConfig) end,
{ok, connecting,
#{connect_module => ConnectModule,
@ -170,6 +186,7 @@ init(Config) ->
max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD),
mountpoint => format_mountpoint(Get(mountpoint, undefined)),
topics => Topics,
subscriptions => Subs,
replayq => Queue,
inflight => []
}}.
@ -255,6 +272,10 @@ connected(Type, Content, State) ->
common(connected, Type, Content, State).
%% Common handlers
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
{keep_state_and_data, [{reply, From, Forwards}]};
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
{keep_state_and_data, [{reply, From, Subs}]};
common(_StateName, info, {dispatch, _, Msg},
#{replayq := Q} = State) ->
NewQ = replayq:append(Q, collect([Msg])),
@ -363,3 +384,6 @@ name() -> {_, Name} = process_info(self(), registered_name), Name.
name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
id(Pid) when is_pid(Pid) -> Pid;
id(Name) -> name(Name).

View File

@ -149,9 +149,10 @@ make_hdlr(Parent, AckCollector, Ref) ->
}.
subscribe_remote_topics(ClientPid, Subscriptions) ->
[case emqx_client:subscribe(ClientPid, {bin(Topic), Qos}) of
{ok, _, _} -> ok;
Error -> throw(Error)
end || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})].
lists:foreach(fun({Topic, Qos}) ->
case emqx_client:subscribe(ClientPid, Topic, Qos) of
{ok, _, _} -> ok;
Error -> throw(Error)
end
end, Subscriptions).
bin(L) -> iolist_to_binary(L).

View File

@ -16,7 +16,8 @@
-export([all/0, init_per_suite/1, end_per_suite/1]).
-export([t_rpc/1,
t_mqtt/1
t_mqtt/1,
t_forwards_mngr/1
]).
-include_lib("eunit/include/eunit.hrl").
@ -27,7 +28,8 @@
-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
all() -> [t_rpc,
t_mqtt
t_mqtt,
t_forwards_mngr
].
init_per_suite(Config) ->
@ -43,6 +45,24 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_forwards_mngr(Config) when is_list(Config) ->
Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
Cfg = #{address => node(),
forwards => [<<"mngr">>],
connect_module => emqx_portal_rpc,
mountpoint => <<"forwarded">>,
subscriptions => Subs
},
Name = ?FUNCTION_NAME,
{ok, Pid} = emqx_portal:start_link(Name, Cfg),
try
?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Name)),
?assertEqual(Subs, emqx_portal:get_subscriptions(Pid))
after
ok = emqx_portal:stop(Pid)
end.
%% A loopback RPC to local node
t_rpc(Config) when is_list(Config) ->
Cfg = #{address => node(),