From 7efd7b3ec05382bdc55c8de19151f20af0cf3231 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 28 Feb 2019 15:31:54 +0800 Subject: [PATCH] Rename portal to bridge --- Makefile | 20 +++--- etc/emqx.conf | 4 +- priv/emqx.schema | 4 +- .../emqx_bridge.erl} | 62 +++++++++---------- .../emqx_bridge_mqtt.erl} | 10 +-- .../emqx_bridge_msg.erl} | 8 +-- .../emqx_bridge_rpc.erl} | 16 ++--- .../emqx_bridge_sup.erl} | 28 ++++----- ...al_connect.erl => emqx_bridge_connect.erl} | 6 +- src/emqx_sup.erl | 5 +- ...portal_SUITE.erl => emqx_bridge_SUITE.erl} | 60 +++++++++--------- ...t_tests.erl => emqx_bridge_mqtt_tests.erl} | 8 +-- ...pc_tests.erl => emqx_bridge_rpc_tests.erl} | 14 ++--- ...portal_tests.erl => emqx_bridge_tests.erl} | 56 ++++++++--------- test/emqx_ct_broker_helpers.erl | 6 +- 15 files changed, 153 insertions(+), 154 deletions(-) rename src/{portal/emqx_portal.erl => bridge/emqx_bridge.erl} (91%) rename src/{portal/emqx_portal_mqtt.erl => bridge/emqx_bridge_mqtt.erl} (96%) rename src/{portal/emqx_portal_msg.erl => bridge/emqx_bridge_msg.erl} (94%) rename src/{portal/emqx_portal_rpc.erl => bridge/emqx_bridge_rpc.erl} (88%) rename src/{portal/emqx_portal_sup.erl => bridge/emqx_bridge_sup.erl} (70%) rename src/{emqx_portal_connect.erl => emqx_bridge_connect.erl} (94%) rename test/{emqx_portal_SUITE.erl => emqx_bridge_SUITE.erl} (79%) rename test/{emqx_portal_mqtt_tests.erl => emqx_bridge_mqtt_tests.erl} (91%) rename test/{emqx_portal_rpc_tests.erl => emqx_bridge_rpc_tests.erl} (80%) rename test/{emqx_portal_tests.erl => emqx_bridge_tests.erl} (75%) diff --git a/Makefile b/Makefile index 9aa90f94c..35c2c0bee 100644 --- a/Makefile +++ b/Makefile @@ -27,17 +27,17 @@ TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx EUNIT_OPTS = verbose -# CT_SUITES = emqx_frame +CT_SUITES = emqx_bridge ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ - emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ - emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ - emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_portal \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ - emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message +# CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ +# emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ +# emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ +# emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ +# emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ +# emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ +# emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ +# emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -110,7 +110,7 @@ rebar-ct: rebar-ct-setup @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') ## Run one single CT with rebar3 -## e.g. make ct-one-suite suite=emqx_portal +## e.g. make ct-one-suite suite=emqx_bridge ct-one-suite: rebar-ct-setup @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE diff --git a/etc/emqx.conf b/etc/emqx.conf index 2668c41a2..92d4f59b8 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1726,14 +1726,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## bridge.aws.max_inflight_batches = 32 ## Max number of messages to collect in a batch for -## each send call towards emqx_portal_connect +## each send call towards emqx_bridge_connect ## ## Value: Integer ## default: 32 ## bridge.aws.queue.batch_count_limit = 32 ## Max number of bytes to collect in a batch for each -## send call towards emqx_portal_connect +## send call towards emqx_bridge_connect ## ## Value: Bytesize ## default: 1000M diff --git a/priv/emqx.schema b/priv/emqx.schema index 4ca43f232..e490e024f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1675,9 +1675,9 @@ end}. true when Subs =/= [] -> error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs}); true -> - emqx_portal_rpc; + emqx_bridge_rpc; false -> - emqx_portal_mqtt + emqx_bridge_mqtt end end, %% to be backward compatible diff --git a/src/portal/emqx_portal.erl b/src/bridge/emqx_bridge.erl similarity index 91% rename from src/portal/emqx_portal.erl rename to src/bridge/emqx_bridge.erl index 4a6ad2437..af85df68e 100644 --- a/src/portal/emqx_portal.erl +++ b/src/bridge/emqx_bridge.erl @@ -12,18 +12,18 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% @doc Portal works in two layers (1) batching layer (2) transport layer -%% The `portal' batching layer collects local messages in batches and sends over +%% @doc Bridge works in two layers (1) batching layer (2) transport layer +%% The `bridge' batching layer collects local messages in batches and sends over %% to remote MQTT node/cluster via `connetion' transport layer. %% In case `REMOTE' is also an EMQX node, `connection' is recommended to be -%% the `gen_rpc' based implementation `emqx_portal_rpc'. Otherwise `connection' -%% has to be `emqx_portal_mqtt'. +%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection' +%% has to be `emqx_bridge_mqtt'. %% %% ``` %% +------+ +--------+ %% | EMQX | | REMOTE | %% | | | | -%% | (portal) <==(connection)==> | | +%% | (bridge) <==(connection)==> | | %% | | | | %% | | | | %% +------+ +--------+ @@ -47,8 +47,8 @@ %% (3): received {disconnected, conn_ref(), Reason} OR %% failed to send to remote node/cluster. %% -%% NOTE: A portal worker may subscribe to multiple (including wildcard) -%% local topics, and the underlying `emqx_portal_connect' may subscribe to +%% NOTE: A bridge worker may subscribe to multiple (including wildcard) +%% local topics, and the underlying `emqx_bridge_connect' may subscribe to %% multiple remote topics, however, worker/connections are not designed %% to support automatic load-balancing, i.e. in case it can not keep up %% with the amount of messages comming in, administrator should split and @@ -57,7 +57,7 @@ %% NOTES: %% * Local messages are all normalised to QoS-1 when exporting to remote --module(emqx_portal). +-module(emqx_bridge). -behaviour(gen_statem). %% APIs @@ -84,7 +84,7 @@ -type id() :: atom() | string() | pid(). -type qos() :: emqx_mqtt_types:qos(). -type config() :: map(). --type batch() :: [emqx_portal_msg:exp_msg()]. +-type batch() :: [emqx_bridge_msg:exp_msg()]. -type ack_ref() :: term(). -type topic() :: emqx_topic:topic(). @@ -99,12 +99,12 @@ -define(DEFAULT_SEG_BYTES, (1 bsl 20)). -define(maybe_send, {next_event, internal, maybe_send}). -%% @doc Start a portal worker. Supported configs: -%% start_type: 'manual' (default) or 'auto', when manual, portal will stay +%% @doc Start a bridge worker. Supported configs: +%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay %% at 'standing_by' state until a manual call to start it. -%% connect_module: The module which implements emqx_portal_connect behaviour +%% connect_module: The module which implements emqx_bridge_connect behaviour %% and work as message batch transport layer -%% reconnect_delay_ms: Delay in milli-seconds for the portal worker to retry +%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry %% in case of transportation failure. %% max_inflight_batches: Max number of batches allowed to send-ahead before %% receiving confirmation from remote node/cluster @@ -112,20 +112,20 @@ %% `undefined', `<<>>' or `""' to disable %% forwards: Local topics to subscribe. %% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each -%% send call towards emqx_portal_connect +%% send call towards emqx_bridge_connect %% queue.batch_count_limit: Max number of messages to collect in a batch for -%% each send call towards emqx_portal_connect +%% each send call towards emqx_bridge_connect %% queue.replayq_dir: Directory where replayq should persist messages %% queue.replayq_seg_bytes: Size in bytes for each replayq segment file %% %% Find more connection specific configs in the callback modules -%% of emqx_portal_connect behaviour. +%% of emqx_bridge_connect behaviour. start_link(Name, Config) when is_list(Config) -> start_link(Name, maps:from_list(Config)); start_link(Name, Config) -> gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []). -%% @doc Manually start portal worker. State idempotency ensured. +%% @doc Manually start bridge worker. State idempotency ensured. ensure_started(Name) -> gen_statem:call(name(Name), ensure_started). @@ -135,7 +135,7 @@ ensure_started(Name, Config) -> {error, {already_started,Pid}} -> {ok, Pid} end. -%% @doc Manually stop portal worker. State idempotency ensured. +%% @doc Manually stop bridge worker. State idempotency ensured. ensure_stopped(Id) -> ensure_stopped(Id, 1000). @@ -168,7 +168,7 @@ status(Pid) -> %% @doc This function is to be evaluated on message/batch receiver side. -spec import_batch(batch(), fun(() -> ok)) -> ok. import_batch(Batch, AckFun) -> - lists:foreach(fun emqx_broker:publish/1, emqx_portal_msg:to_broker_msgs(Batch)), + lists:foreach(fun emqx_broker:publish/1, emqx_bridge_msg:to_broker_msgs(Batch)), AckFun(). %% @doc This function is to be evaluated on message/batch exporter side @@ -197,14 +197,14 @@ ensure_forward_absent(Id, Topic) -> gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}). %% @doc Ensure subscribed to remote topic. -%% NOTE: only applicable when connection module is emqx_portal_mqtt +%% NOTE: only applicable when connection module is emqx_bridge_mqtt %% return `{error, no_remote_subscription_support}' otherwise. -spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}. ensure_subscription_present(Id, Topic, QoS) -> gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}). %% @doc Ensure unsubscribed from remote topic. -%% NOTE: only applicable when connection module is emqx_portal_mqtt +%% NOTE: only applicable when connection module is emqx_bridge_mqtt -spec ensure_subscription_absent(id(), topic()) -> ok. ensure_subscription_absent(Id, Topic) -> gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}). @@ -225,7 +225,7 @@ init(Config) -> seg_bytes => GetQ(replayq_seg_bytes, ?DEFAULT_SEG_BYTES) } end, - Queue = replayq:open(QueueConfig#{sizer => fun emqx_portal_msg:estimate_size/1, + Queue = replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1, marshaller => fun msg_marshaller/1}), Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]), Subs = lists:keysort(1, lists:map(fun({T0, QoS}) -> @@ -241,7 +241,7 @@ init(Config) -> mountpoint, forwards ], Config#{subscriptions => Subs}), - ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end, + ConnectFun = fun(SubsX) -> emqx_bridge_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end, {ok, standing_by, #{connect_module => ConnectModule, connect_fun => ConnectFun, @@ -279,7 +279,7 @@ standing_by(state_timeout, do_connect, State) -> standing_by({call, From}, _Call, _State) -> {keep_state_and_data, [{reply, From, {error,standing_by}}]}; standing_by(info, Info, State) -> - ?LOG(info, "Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]), + ?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), {keep_state_and_data, State}; standing_by(Type, Content, State) -> common(standing_by, Type, Content, State). @@ -298,7 +298,7 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, ok = subscribe_local_topics(Forwards), case ConnectFun(Subs) of {ok, ConnRef, Conn} -> - ?LOG(info, "Portal ~p connected", [name()]), + ?LOG(info, "Bridge ~p connected", [name()]), Action = {state_timeout, 0, connected}, {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; error -> @@ -348,7 +348,7 @@ connected(info, {disconnected, ConnRef, Reason}, #{conn_ref := ConnRefCurrent, connection := Conn} = State) -> case ConnRefCurrent =:= ConnRef of true -> - ?LOG(info, "Portal ~p diconnected~nreason=~p", [name(), Conn, Reason]), + ?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Conn, Reason]), {next_state, connecting, State#{conn_ref := undefined, connection := undefined}}; false -> @@ -360,7 +360,7 @@ connected(info, {batch_ack, Ref}, State) -> keep_state_and_data; bad_order -> %% try re-connect then re-send - ?LOG(error, "Bad order ack received by portal ~p", [name()]), + ?LOG(error, "Bad order ack received by bridge ~p", [name()]), {next_state, connecting, disconnect(State)}; {ok, NewState} -> {keep_state, NewState, ?maybe_send} @@ -391,7 +391,7 @@ common(_StateName, info, {dispatch, _, Msg}, NewQ = replayq:append(Q, collect([Msg])), {keep_state, State#{replayq => NewQ}, ?maybe_send}; common(StateName, Type, Content, State) -> - ?LOG(info, "Portal ~p discarded ~p type event at state ~p:\n~p", + ?LOG(info, "Bridge ~p discarded ~p type event at state ~p:\n~p", [name(), Type, StateName, Content]), {keep_state, State}. @@ -531,15 +531,15 @@ disconnect(#{connection := Conn, disconnect(State) -> State. %% Called only when replayq needs to dump it to disk. -msg_marshaller(Bin) when is_binary(Bin) -> emqx_portal_msg:from_binary(Bin); -msg_marshaller(Msg) -> emqx_portal_msg:to_binary(Msg). +msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin); +msg_marshaller(Msg) -> emqx_bridge_msg:to_binary(Msg). %% Return {ok, SendAckRef} or {error, Reason} maybe_send(#{connect_module := Module, connection := Connection, mountpoint := Mountpoint }, Batch) -> - Module:send(Connection, [emqx_portal_msg:to_export(Module, Mountpoint, M) || M <- Batch]). + Module:send(Connection, [emqx_bridge_msg:to_export(Module, Mountpoint, M) || M <- Batch]). format_mountpoint(undefined) -> undefined; diff --git a/src/portal/emqx_portal_mqtt.erl b/src/bridge/emqx_bridge_mqtt.erl similarity index 96% rename from src/portal/emqx_portal_mqtt.erl rename to src/bridge/emqx_bridge_mqtt.erl index 3d0b90e30..590fbabb7 100644 --- a/src/portal/emqx_portal_mqtt.erl +++ b/src/bridge/emqx_bridge_mqtt.erl @@ -12,10 +12,10 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% @doc This module implements EMQX Portal transport layer on top of MQTT protocol +%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol --module(emqx_portal_mqtt). --behaviour(emqx_portal_connect). +-module(emqx_bridge_mqtt). +-behaviour(emqx_bridge_connect). %% behaviour callbacks -export([start/1, @@ -154,7 +154,7 @@ match_acks(Parent, Acked, Sent) -> match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent}; match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) -> %% batch finished - ok = emqx_portal:handle_ack(Parent, Ref), + ok = emqx_bridge:handle_ack(Parent, Ref), match_acks(Parent, Acked, Sent); match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) -> %% one message finished, but not the whole batch @@ -171,7 +171,7 @@ handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) -> %% Message published from remote broker. Import to local broker. import_msg(Msg) -> %% auto-ack should be enabled in emqx_client, hence dummy ack-fun. - emqx_portal:import_batch([Msg], _AckFun = fun() -> ok end). + emqx_bridge:import_batch([Msg], _AckFun = fun() -> ok end). make_hdlr(Parent, AckCollector, Ref) -> #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end, diff --git a/src/portal/emqx_portal_msg.erl b/src/bridge/emqx_bridge_msg.erl similarity index 94% rename from src/portal/emqx_portal_msg.erl rename to src/bridge/emqx_bridge_msg.erl index 45cd64154..6633027f9 100644 --- a/src/portal/emqx_portal_msg.erl +++ b/src/bridge/emqx_bridge_msg.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_portal_msg). +-module(emqx_bridge_msg). -export([ to_binary/1 , from_binary/1 @@ -37,9 +37,9 @@ %% Shame that we have to know the callback module here %% would be great if we can get rid of #mqtt_msg{} record %% and use #message{} in all places. --spec to_export(emqx_portal_rpc | emqx_portal_mqtt, +-spec to_export(emqx_bridge_rpc | emqx_bridge_mqtt, undefined | binary(), msg()) -> exp_msg(). -to_export(emqx_portal_mqtt, Mountpoint, +to_export(emqx_bridge_mqtt, Mountpoint, #message{topic = Topic, payload = Payload, flags = Flags @@ -79,6 +79,6 @@ to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic, %% published from remote node over a MQTT connection emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, - emqx_message:make(portal, QoS, Topic, Payload))). + emqx_message:make(bridge, QoS, Topic, Payload))). topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic). diff --git a/src/portal/emqx_portal_rpc.erl b/src/bridge/emqx_bridge_rpc.erl similarity index 88% rename from src/portal/emqx_portal_rpc.erl rename to src/bridge/emqx_bridge_rpc.erl index 8b5600f77..b818d65da 100644 --- a/src/portal/emqx_portal_rpc.erl +++ b/src/bridge/emqx_bridge_rpc.erl @@ -12,10 +12,10 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% @doc This module implements EMQX Portal transport layer based on gen_rpc. +%% @doc This module implements EMQX Bridge transport layer based on gen_rpc. --module(emqx_portal_rpc). --behaviour(emqx_portal_connect). +-module(emqx_bridge_rpc). +-behaviour(emqx_bridge_connect). %% behaviour callbacks -export([start/1, @@ -29,8 +29,8 @@ , heartbeat/2 ]). --type ack_ref() :: emqx_portal:ack_ref(). --type batch() :: emqx_portal:batch(). +-type ack_ref() :: emqx_bridge:ack_ref(). +-type batch() :: emqx_bridge:batch(). -define(HEARTBEAT_INTERVAL, timer:seconds(1)). @@ -58,7 +58,7 @@ stop(Pid, _Remote) when is_pid(Pid) -> end, ok. -%% @doc Callback for `emqx_portal_connect' behaviour +%% @doc Callback for `emqx_bridge_connect' behaviour -spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}. send(Remote, Batch) -> Sender = self(), @@ -73,14 +73,14 @@ handle_send(SenderPid, Batch) -> SenderNode = node(SenderPid), Ref = make_ref(), AckFun = fun() -> ?RPC:cast(SenderNode, ?MODULE, handle_ack, [SenderPid, Ref]), ok end, - case emqx_portal:import_batch(Batch, AckFun) of + case emqx_bridge:import_batch(Batch, AckFun) of ok -> {ok, Ref}; Error -> Error end. %% @doc Handle batch ack in sender node. handle_ack(SenderPid, Ref) -> - ok = emqx_portal:handle_ack(SenderPid, Ref). + ok = emqx_bridge:handle_ack(SenderPid, Ref). %% @hidden Heartbeat loop heartbeat(Parent, RemoteNode) -> diff --git a/src/portal/emqx_portal_sup.erl b/src/bridge/emqx_bridge_sup.erl similarity index 70% rename from src/portal/emqx_portal_sup.erl rename to src/bridge/emqx_bridge_sup.erl index 271c60423..bcacb411c 100644 --- a/src/portal/emqx_portal_sup.erl +++ b/src/bridge/emqx_bridge_sup.erl @@ -12,17 +12,17 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_portal_sup). +-module(emqx_bridge_sup). -behavior(supervisor). -include("logger.hrl"). --export([start_link/0, start_link/1, portals/0]). --export([create_portal/2, drop_portal/1]). +-export([start_link/0, start_link/1, bridges/0]). +-export([create_bridge/2, drop_bridge/1]). -export([init/1]). -define(SUP, ?MODULE). --define(WORKER_SUP, emqx_portal_worker_sup). +-define(WORKER_SUP, emqx_bridge_worker_sup). start_link() -> start_link(?SUP). @@ -31,28 +31,28 @@ start_link(Name) -> init(?SUP) -> BridgesConf = emqx_config:get_env(bridges, []), - BridgeSpec = lists:map(fun portal_spec/1, BridgesConf), + BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf), SupFlag = #{strategy => one_for_one, intensity => 100, period => 10}, {ok, {SupFlag, BridgeSpec}}. -portal_spec({Name, Config}) -> +bridge_spec({Name, Config}) -> #{id => Name, - start => {emqx_portal, start_link, [Name, Config]}, + start => {emqx_bridge, start_link, [Name, Config]}, restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_portal]}. + modules => [emqx_bridge]}. --spec(portals() -> [{node(), map()}]). -portals() -> - [{Name, emqx_portal:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. +-spec(bridges() -> [{node(), map()}]). +bridges() -> + [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. -create_portal(Id, Config) -> - supervisor:start_child(?SUP, portal_spec({Id, Config})). +create_bridge(Id, Config) -> + supervisor:start_child(?SUP, bridge_spec({Id, Config})). -drop_portal(Id) -> +drop_bridge(Id) -> case supervisor:terminate_child(?SUP, Id) of ok -> supervisor:delete_child(?SUP, Id); diff --git a/src/emqx_portal_connect.erl b/src/emqx_bridge_connect.erl similarity index 94% rename from src/emqx_portal_connect.erl rename to src/emqx_bridge_connect.erl index 9c2d168f3..b2781cc2c 100644 --- a/src/emqx_portal_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_portal_connect). +-module(emqx_bridge_connect). -export([start/2]). @@ -25,7 +25,7 @@ -type connection() :: term(). -type conn_ref() :: term(). -type batch() :: emqx_protal:batch(). --type ack_ref() :: emqx_portal:ack_ref(). +-type ack_ref() :: emqx_bridge:ack_ref(). -type topic() :: emqx_topic:topic(). -type qos() :: emqx_mqtt_types:qos(). @@ -37,7 +37,7 @@ -callback start(config()) -> {ok, conn_ref(), connection()} | {error, any()}. %% send to remote node/cluster -%% portal worker (the caller process) should be expecting +%% bridge worker (the caller process) should be expecting %% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster -callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}. diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 0e6ebb08a..eff33a841 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -61,7 +61,7 @@ init([]) -> RouterSup = supervisor_spec(emqx_router_sup), %% Broker Sup BrokerSup = supervisor_spec(emqx_broker_sup), - PortalSup = supervisor_spec(emqx_portal_sup), + BridgeSup = supervisor_spec(emqx_bridge_sup), %% AccessControl AccessControl = worker_spec(emqx_access_control), %% Session Manager @@ -74,7 +74,7 @@ init([]) -> [KernelSup, RouterSup, BrokerSup, - PortalSup, + BridgeSup, AccessControl, SMSup, CMSup, @@ -88,4 +88,3 @@ worker_spec(M) -> {M, {M, start_link, []}, permanent, 30000, worker, [M]}. supervisor_spec(M) -> {M, {M, start_link, []}, permanent, infinity, supervisor, [M]}. - diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_bridge_SUITE.erl similarity index 79% rename from test/emqx_portal_SUITE.erl rename to test/emqx_bridge_SUITE.erl index 2ceb7ea1e..465b20637 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_bridge_SUITE.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_portal_SUITE). +-module(emqx_bridge_SUITE). -export([all/0, init_per_suite/1, end_per_suite/1]). -export([t_rpc/1, @@ -48,39 +48,39 @@ t_mngr(Config) when is_list(Config) -> Subs = [{<<"a">>, 1}, {<<"b">>, 2}], Cfg = #{address => node(), forwards => [<<"mngr">>], - connect_module => emqx_portal_rpc, + connect_module => emqx_bridge_rpc, mountpoint => <<"forwarded">>, subscriptions => Subs, start_type => auto }, Name = ?FUNCTION_NAME, - {ok, Pid} = emqx_portal:start_link(Name, Cfg), + {ok, Pid} = emqx_bridge:start_link(Name, Cfg), try - ?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Name)), - ?assertEqual(ok, emqx_portal:ensure_forward_present(Name, "mngr")), - ?assertEqual(ok, emqx_portal:ensure_forward_present(Name, "mngr2")), - ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_portal:get_forwards(Pid)), - ?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr2")), - ?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr3")), - ?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Pid)), + ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Name)), + ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr")), + ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr2")), + ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_bridge:get_forwards(Pid)), + ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr2")), + ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr3")), + ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Pid)), ?assertEqual({error, no_remote_subscription_support}, - emqx_portal:ensure_subscription_present(Pid, <<"t">>, 0)), + emqx_bridge:ensure_subscription_present(Pid, <<"t">>, 0)), ?assertEqual({error, no_remote_subscription_support}, - emqx_portal:ensure_subscription_absent(Pid, <<"t">>)), - ?assertEqual(Subs, emqx_portal:get_subscriptions(Pid)) + emqx_bridge:ensure_subscription_absent(Pid, <<"t">>)), + ?assertEqual(Subs, emqx_bridge:get_subscriptions(Pid)) after - ok = emqx_portal:stop(Pid) + ok = emqx_bridge:stop(Pid) end. %% A loopback RPC to local node t_rpc(Config) when is_list(Config) -> Cfg = #{address => node(), forwards => [<<"t_rpc/#">>], - connect_module => emqx_portal_rpc, + connect_module => emqx_bridge_rpc, mountpoint => <<"forwarded">>, start_type => auto }, - {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg), + {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg), ClientId = <<"ClientId">>, try {ok, ConnPid} = emqx_mock_client:start_link(ClientId), @@ -96,13 +96,13 @@ t_rpc(Config) when is_list(Config) -> end, 4000), emqx_mock_client:close_session(ConnPid) after - ok = emqx_portal:stop(Pid) + ok = emqx_bridge:stop(Pid) end. %% Full data loopback flow explained: %% test-pid ---> mock-cleint ----> local-broker ---(local-subscription)---> -%% portal(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) --> -%% portal(import) --(mecked message sending)--> test-pid +%% bridge(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) --> +%% bridge(import) --(mecked message sending)--> test-pid t_mqtt(Config) when is_list(Config) -> SendToTopic = <<"t_mqtt/one">>, SendToTopic2 = <<"t_mqtt/two">>, @@ -111,7 +111,7 @@ t_mqtt(Config) when is_list(Config) -> ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]), Cfg = #{address => "127.0.0.1:1883", forwards => [SendToTopic], - connect_module => emqx_portal_mqtt, + connect_module => emqx_bridge_mqtt, mountpoint => Mountpoint, username => "user", clean_start => true, @@ -128,26 +128,26 @@ t_mqtt(Config) when is_list(Config) -> reconnect_delay_ms => 1000, ssl => false, %% Consume back to forwarded message for verification - %% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2 + %% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2 subscriptions => [{ForwardedTopic, _QoS = 1}], start_type => auto }, Tester = self(), Ref = make_ref(), - meck:new(emqx_portal, [passthrough, no_history]), - meck:expect(emqx_portal, import_batch, 2, + meck:new(emqx_bridge, [passthrough, no_history]), + meck:expect(emqx_bridge, import_batch, 2, fun(Batch, AckFun) -> Tester ! {Ref, Batch}, AckFun() end), - {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg), + {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg), ClientId = <<"client-1">>, try - ?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)), - ok = emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1), - ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2), + ?assertEqual([{ForwardedTopic, 1}], emqx_bridge:get_subscriptions(Pid)), + ok = emqx_bridge:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1), + ok = emqx_bridge:ensure_forward_present(Pid, SendToTopic2), ?assertEqual([{ForwardedTopic, 1}, - {ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)), + {ForwardedTopic2, 1}], emqx_bridge:get_subscriptions(Pid)), {ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), %% message from a different client, to avoid getting terminated by no-local @@ -166,8 +166,8 @@ t_mqtt(Config) when is_list(Config) -> ok = receive_and_match_messages(Ref, Msgs2), emqx_mock_client:close_session(ConnPid) after - ok = emqx_portal:stop(Pid), - meck:unload(emqx_portal) + ok = emqx_bridge:stop(Pid), + meck:unload(emqx_bridge) end. receive_and_match_messages(Ref, Msgs) -> diff --git a/test/emqx_portal_mqtt_tests.erl b/test/emqx_bridge_mqtt_tests.erl similarity index 91% rename from test/emqx_portal_mqtt_tests.erl rename to test/emqx_bridge_mqtt_tests.erl index 1fcd71e79..7c094b957 100644 --- a/test/emqx_portal_mqtt_tests.erl +++ b/test/emqx_bridge_mqtt_tests.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_portal_mqtt_tests). +-module(emqx_bridge_mqtt_tests). -include_lib("eunit/include/eunit.hrl"). -include("emqx_mqtt.hrl"). @@ -39,12 +39,12 @@ send_and_ack_test() -> try Max = 100, Batch = lists:seq(1, Max), - {ok, Ref, Conn} = emqx_portal_mqtt:start(#{address => "127.0.0.1:1883"}), + {ok, Ref, Conn} = emqx_bridge_mqtt:start(#{address => "127.0.0.1:1883"}), %% return last packet id as batch reference - {ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch), + {ok, AckRef} = emqx_bridge_mqtt:send(Conn, Batch), %% expect batch ack receive {batch_ack, AckRef} -> ok end, - ok = emqx_portal_mqtt:stop(Ref, Conn) + ok = emqx_bridge_mqtt:stop(Ref, Conn) after meck:unload(emqx_client) end. diff --git a/test/emqx_portal_rpc_tests.erl b/test/emqx_bridge_rpc_tests.erl similarity index 80% rename from test/emqx_portal_rpc_tests.erl rename to test/emqx_bridge_rpc_tests.erl index 363a0ee5a..28e05b895 100644 --- a/test/emqx_portal_rpc_tests.erl +++ b/test/emqx_bridge_rpc_tests.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_portal_rpc_tests). +-module(emqx_bridge_rpc_tests). -include_lib("eunit/include/eunit.hrl"). send_and_ack_test() -> @@ -26,18 +26,18 @@ send_and_ack_test() -> fun(Node, Module, Fun, Args) -> rpc:cast(Node, Module, Fun, Args) end), - meck:new(emqx_portal, [passthrough, no_history]), - meck:expect(emqx_portal, import_batch, 2, + meck:new(emqx_bridge, [passthrough, no_history]), + meck:expect(emqx_bridge, import_batch, 2, fun(batch, AckFun) -> AckFun() end), try - {ok, Pid, Node} = emqx_portal_rpc:start(#{address => node()}), - {ok, Ref} = emqx_portal_rpc:send(Node, batch), + {ok, Pid, Node} = emqx_bridge_rpc:start(#{address => node()}), + {ok, Ref} = emqx_bridge_rpc:send(Node, batch), receive {batch_ack, Ref} -> ok end, - ok = emqx_portal_rpc:stop(Pid, Node) + ok = emqx_bridge_rpc:stop(Pid, Node) after meck:unload(gen_rpc), - meck:unload(emqx_portal) + meck:unload(emqx_bridge) end. diff --git a/test/emqx_portal_tests.erl b/test/emqx_bridge_tests.erl similarity index 75% rename from test/emqx_portal_tests.erl rename to test/emqx_bridge_tests.erl index 7cca66adc..22b2c4d49 100644 --- a/test/emqx_portal_tests.erl +++ b/test/emqx_bridge_tests.erl @@ -12,15 +12,15 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_portal_tests). --behaviour(emqx_portal_connect). +-module(emqx_bridge_tests). +-behaviour(emqx_bridge_connect). -include_lib("eunit/include/eunit.hrl"). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --define(PORTAL_NAME, test). --define(PORTAL_REG_NAME, emqx_portal_test). +-define(BRIDGE_NAME, test). +-define(BRIDGE_REG_NAME, emqx_bridge_test). -define(WAIT(PATTERN, TIMEOUT), receive PATTERN -> @@ -45,29 +45,29 @@ send(SendFun, Batch) when is_function(SendFun, 1) -> stop(_Ref, _Pid) -> ok. -%% portal worker should retry connecting remote node indefinitely +%% bridge worker should retry connecting remote node indefinitely reconnect_test() -> Ref = make_ref(), Config = make_config(Ref, self(), {error, test}), - {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), + {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), %% assert name registered - ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), ?WAIT({connection_start_attempt, Ref}, 1000), %% expect same message again ?WAIT({connection_start_attempt, Ref}, 1000), - ok = emqx_portal:stop(?PORTAL_REG_NAME), + ok = emqx_bridge:stop(?BRIDGE_REG_NAME), ok. %% connect first, disconnect, then connect again disturbance_test() -> Ref = make_ref(), Config = make_config(Ref, self(), {ok, Ref, connection}), - {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), - ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), + {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), ?WAIT({connection_start_attempt, Ref}, 1000), Pid ! {disconnected, Ref, test}, ?WAIT({connection_start_attempt, Ref}, 1000), - ok = emqx_portal:stop(?PORTAL_REG_NAME). + ok = emqx_bridge:stop(?BRIDGE_REG_NAME). %% buffer should continue taking in messages when disconnected buffer_when_disconnected_test_() -> @@ -76,9 +76,9 @@ buffer_when_disconnected_test_() -> test_buffer_when_disconnected() -> Ref = make_ref(), Nums = lists:seq(1, 100), - Sender = spawn_link(fun() -> receive {portal, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end), + Sender = spawn_link(fun() -> receive {bridge, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end), SenderMref = monitor(process, Sender), - Receiver = spawn_link(fun() -> receive {portal, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end), + Receiver = spawn_link(fun() -> receive {bridge, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end), ReceiverMref = monitor(process, Receiver), SendFun = fun(Batch) -> BatchRef = make_ref(), @@ -87,44 +87,44 @@ test_buffer_when_disconnected() -> end, Config0 = make_config(Ref, false, {ok, Ref, SendFun}), Config = Config0#{reconnect_delay_ms => 100}, - {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), - Sender ! {portal, Pid}, - Receiver ! {portal, Pid}, - ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), + {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config), + Sender ! {bridge, Pid}, + Receiver ! {bridge, Pid}, + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), Pid ! {disconnected, Ref, test}, ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000), ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), - ok = emqx_portal:stop(?PORTAL_REG_NAME). + ok = emqx_bridge:stop(?BRIDGE_REG_NAME). manual_start_stop_test() -> Ref = make_ref(), Config0 = make_config(Ref, self(), {ok, Ref, connection}), Config = Config0#{start_type := manual}, - {ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config), + {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config), %% call ensure_started again should yeld the same result - {ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config), - ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), + {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config), + ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), ?assertEqual({error, standing_by}, - emqx_portal:ensure_forward_present(Pid, "dummy")), - emqx_portal:ensure_stopped(unknown), - emqx_portal:ensure_stopped(Pid), - emqx_portal:ensure_stopped(?PORTAL_REG_NAME). + emqx_bridge:ensure_forward_present(Pid, "dummy")), + emqx_bridge:ensure_stopped(unknown), + emqx_bridge:ensure_stopped(Pid), + emqx_bridge:ensure_stopped(?BRIDGE_REG_NAME). -%% Feed messages to portal +%% Feed messages to bridge sender_loop(_Pid, [], _) -> exit(normal); sender_loop(Pid, [Num | Rest], Interval) -> random_sleep(Interval), Pid ! {dispatch, dummy, make_msg(Num)}, sender_loop(Pid, Rest, Interval). -%% Feed acknowledgments to portal +%% Feed acknowledgments to bridge receiver_loop(_Pid, [], _) -> ok; receiver_loop(Pid, Nums, Interval) -> receive {batch, BatchRef, Batch} -> Rest = match_nums(Batch, Nums), random_sleep(Interval), - emqx_portal:handle_ack(Pid, BatchRef), + emqx_bridge:handle_ack(Pid, BatchRef), receiver_loop(Pid, Rest, Interval) end. diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 0e0bfa3a4..88240be85 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -170,13 +170,13 @@ flush(Msgs) -> bridge_conf() -> [ {local_rpc, - [{connect_module, emqx_portal_rpc}, + [{connect_module, emqx_bridge_rpc}, {address, node()}, - {forwards, ["portal-1/#", "portal-2/#"]} + {forwards, ["bridge-1/#", "bridge-2/#"]} ]} ]. % [{aws, - % [{connect_module, emqx_portal_mqtt}, + % [{connect_module, emqx_bridge_mqtt}, % {username,"user"}, % {address,"127.0.0.1:1883"}, % {clean_start,true},