From 9e78c186811808fdde67b0998323d3031762dad6 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Fri, 15 Feb 2019 17:06:02 +0100 Subject: [PATCH] Add get_forwards and get_subscriptions protal APIs --- src/emqx_types.erl | 2 +- src/portal/emqx_portal.erl | 30 +++++++++++++++++++++++++++--- src/portal/emqx_portal_mqtt.erl | 11 ++++++----- test/emqx_portal_SUITE.erl | 24 ++++++++++++++++++++++-- 4 files changed, 56 insertions(+), 11 deletions(-) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index d84b1099a..904e1df91 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -31,7 +31,7 @@ -type(pubsub() :: publish | subscribe). -type(topic() :: binary()). -type(subid() :: binary() | atom()). --type(subopts() :: #{qos := integer(), +-type(subopts() :: #{qos := eqmx_mqtt_types:qos(), share => binary(), atom() => term() }). diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl index 51ce4a4dc..238b6681f 100644 --- a/src/portal/emqx_portal.erl +++ b/src/portal/emqx_portal.erl @@ -41,7 +41,7 @@ %% | | | %% '--(1)---'--------(3)------' %% -%% (1): timeout +%% (1): retry timeout %% (2): successfuly connected to remote node/cluster %% (3): received {disconnected, conn_ref(), Reason} OR %% failed to send to remote node/cluster. @@ -72,11 +72,17 @@ %% state functions -export([connecting/3, connected/3]). +%% management APIs +-export([get_forwards/1]). %, add_forward/2, del_forward/2]). +-export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]). + -export_type([config/0, batch/0, ack_ref/0 ]). +-type id() :: atom() | string() | pid(). +-type qos() :: emqx_mqtt_types:qos(). -type config() :: map(). -type batch() :: [emqx_portal_msg:exp_msg()]. -type ack_ref() :: term(). @@ -131,6 +137,11 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) -> Pid ! {batch_ack, Ref}, ok. +-spec get_forwards(id()) -> [emqx_topic:topic()]. +get_forwards(Id) -> gen_statem:call(id(Id), get_forwards). + +-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. +get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions). callback_mode() -> [state_functions, state_enter]. @@ -150,7 +161,12 @@ init(Config) -> end, Queue = replayq:open(QueueConfig#{sizer => fun emqx_portal_msg:estimate_size/1, marshaller => fun msg_marshaller/1}), - Topics = Get(forwards, []), + Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]), + Subs = lists:keysort(1, lists:map(fun({T0, QoS}) -> + T = iolist_to_binary(T0), + true = emqx_topic:validate({filter, T}), + {T, QoS} + end, Get(subscriptions, []))), ok = subscribe_local_topics(Topics), ConnectModule = maps:get(connect_module, Config), ConnectConfig = maps:without([connect_module, @@ -159,7 +175,7 @@ init(Config) -> max_inflight_batches, mountpoint, forwards - ], Config), + ], Config#{subscriptions => Subs}), ConnectFun = fun() -> emqx_portal_connect:start(ConnectModule, ConnectConfig) end, {ok, connecting, #{connect_module => ConnectModule, @@ -170,6 +186,7 @@ init(Config) -> max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD), mountpoint => format_mountpoint(Get(mountpoint, undefined)), topics => Topics, + subscriptions => Subs, replayq => Queue, inflight => [] }}. @@ -255,6 +272,10 @@ connected(Type, Content, State) -> common(connected, Type, Content, State). %% Common handlers +common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> + {keep_state_and_data, [{reply, From, Forwards}]}; +common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) -> + {keep_state_and_data, [{reply, From, Subs}]}; common(_StateName, info, {dispatch, _, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, collect([Msg])), @@ -363,3 +384,6 @@ name() -> {_, Name} = process_info(self(), registered_name), Name. name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). +id(Pid) when is_pid(Pid) -> Pid; +id(Name) -> name(Name). + diff --git a/src/portal/emqx_portal_mqtt.erl b/src/portal/emqx_portal_mqtt.erl index f01633111..ba7461943 100644 --- a/src/portal/emqx_portal_mqtt.erl +++ b/src/portal/emqx_portal_mqtt.erl @@ -149,9 +149,10 @@ make_hdlr(Parent, AckCollector, Ref) -> }. subscribe_remote_topics(ClientPid, Subscriptions) -> - [case emqx_client:subscribe(ClientPid, {bin(Topic), Qos}) of - {ok, _, _} -> ok; - Error -> throw(Error) - end || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})]. + lists:foreach(fun({Topic, Qos}) -> + case emqx_client:subscribe(ClientPid, Topic, Qos) of + {ok, _, _} -> ok; + Error -> throw(Error) + end + end, Subscriptions). -bin(L) -> iolist_to_binary(L). diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl index f8ca04ea2..50d438f16 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_portal_SUITE.erl @@ -16,7 +16,8 @@ -export([all/0, init_per_suite/1, end_per_suite/1]). -export([t_rpc/1, - t_mqtt/1 + t_mqtt/1, + t_forwards_mngr/1 ]). -include_lib("eunit/include/eunit.hrl"). @@ -27,7 +28,8 @@ -define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). all() -> [t_rpc, - t_mqtt + t_mqtt, + t_forwards_mngr ]. init_per_suite(Config) -> @@ -43,6 +45,24 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). +t_forwards_mngr(Config) when is_list(Config) -> + Subs = [{<<"a">>, 1}, {<<"b">>, 2}], + Cfg = #{address => node(), + forwards => [<<"mngr">>], + connect_module => emqx_portal_rpc, + mountpoint => <<"forwarded">>, + subscriptions => Subs + }, + Name = ?FUNCTION_NAME, + {ok, Pid} = emqx_portal:start_link(Name, Cfg), + try + ?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Name)), + ?assertEqual(Subs, emqx_portal:get_subscriptions(Pid)) + after + ok = emqx_portal:stop(Pid) + end. + + %% A loopback RPC to local node t_rpc(Config) when is_list(Config) -> Cfg = #{address => node(),