refactor(mqttbridge): move into separate application

This commit is contained in:
Andrew Mayorov 2023-05-29 22:00:38 +03:00
parent ebd612b194
commit 7e7b50c5ba
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
12 changed files with 54 additions and 35 deletions

View File

@ -58,14 +58,14 @@
).
-if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
bridge_to_resource_type(webhook) -> emqx_connector_http;
bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType).
-else.
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
bridge_to_resource_type(webhook) -> emqx_connector_http.
-endif.

View File

@ -0,0 +1,18 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx,
emqx_resource,
emqx_bridge,
emqtt
]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt).
-module(emqx_bridge_mqtt_connector).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
@ -83,7 +83,7 @@ start_ingress(ResourceId, Ingress, ClientOpts) ->
{ingress, Ingress},
{client_opts, ClientOpts}
],
case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_ingress, Options) of
case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of
ok ->
{ok, #{ingress_pool_name => PoolName}};
{error, {start_pool_failed, _, Reason}} ->
@ -128,11 +128,11 @@ start_egress(ResourceId, Egress, ClientOpts) ->
{pool_size, PoolSize},
{client_opts, ClientOpts}
],
case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_egress, Options) of
case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of
ok ->
{ok, #{
egress_pool_name => PoolName,
egress_config => emqx_connector_mqtt_egress:config(Egress)
egress_config => emqx_bridge_mqtt_egress:config(Egress)
}};
{error, {start_pool_failed, _, Reason}} ->
{error, Reason}
@ -197,7 +197,7 @@ on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) ->
}).
with_egress_client(ResourceId, Fun, Args) ->
ecpool:pick_and_do(ResourceId, {emqx_connector_mqtt_egress, Fun, Args}, no_handover).
ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover).
on_async_result(Callback, Result) ->
apply_callback_function(Callback, handle_send_result(Result)).
@ -250,9 +250,9 @@ on_get_status(_ResourceId, State) ->
get_status({Pool, Worker}) ->
case ecpool_worker:client(Worker) of
{ok, Client} when Pool == ingress_pool_name ->
emqx_connector_mqtt_ingress:status(Client);
emqx_bridge_mqtt_ingress:status(Client);
{ok, Client} when Pool == egress_pool_name ->
emqx_connector_mqtt_egress:status(Client);
emqx_bridge_mqtt_egress:status(Client);
{error, _} ->
disconnected
end.
@ -300,7 +300,7 @@ mk_client_opts(
ssl := #{enable := EnableSsl} = Ssl
}
) ->
HostPort = emqx_connector_mqtt_schema:parse_server(Server),
HostPort = emqx_bridge_mqtt_connector_schema:parse_server(Server),
Options = maps:with(
[
proto_ver,

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt_schema).
-module(emqx_bridge_mqtt_connector_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -138,13 +138,13 @@ fields("ingress") ->
{remote,
mk(
ref(?MODULE, "ingress_remote"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")}
#{desc => ?DESC("ingress_remote")}
)},
{local,
mk(
ref(?MODULE, "ingress_local"),
#{
desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")
desc => ?DESC("ingress_local")
}
)}
];
@ -211,7 +211,7 @@ fields("egress") ->
mk(
ref(?MODULE, "egress_local"),
#{
desc => ?DESC(emqx_connector_mqtt_schema, "egress_local"),
desc => ?DESC("egress_local"),
required => false
}
)},
@ -219,7 +219,7 @@ fields("egress") ->
mk(
ref(?MODULE, "egress_remote"),
#{
desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote"),
desc => ?DESC("egress_remote"),
required => true
}
)}

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt_egress).
-module(emqx_bridge_mqtt_egress).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
@ -51,7 +51,7 @@
local => #{
topic => emqx_topic:topic()
},
remote := emqx_connector_mqtt_msg:msgvars()
remote := emqx_bridge_mqtt_msg:msgvars()
}.
%% @doc Start an ingress bridge worker.
@ -102,7 +102,7 @@ connect(Pid, Name) ->
-spec config(map()) ->
egress().
config(#{remote := RC = #{}} = Conf) ->
Conf#{remote => emqx_connector_mqtt_msg:parse(RC)}.
Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}.
-spec send(pid(), message(), egress()) ->
ok.
@ -118,7 +118,7 @@ send_async(Pid, MsgIn, Callback, Egress) ->
export_msg(Msg, #{remote := Remote}) ->
to_remote_msg(Msg, Remote).
-spec to_remote_msg(message(), emqx_connector_mqtt_msg:msgvars()) ->
-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) ->
remote_message().
to_remote_msg(#message{flags = Flags} = Msg, Vars) ->
{EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
@ -129,7 +129,7 @@ to_remote_msg(Msg = #{}, Remote) ->
payload := Payload,
qos := QoS,
retain := Retain
} = emqx_connector_mqtt_msg:render(Msg, Remote),
} = emqx_bridge_mqtt_msg:render(Msg, Remote),
PubProps = maps:get(pub_props, Msg, #{}),
#mqtt_msg{
qos = QoS,

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt_ingress).
-module(emqx_bridge_mqtt_ingress).
-include_lib("emqx/include/logger.hrl").
@ -46,7 +46,7 @@
topic := emqx_topic:topic(),
qos => emqx_types:qos()
},
local := emqx_connector_mqtt_msg:msgvars(),
local := emqx_bridge_mqtt_msg:msgvars(),
on_message_received := {module(), atom(), [term()]}
}.
@ -135,7 +135,7 @@ subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
config(#{remote := RC, local := LC} = Conf, BridgeName) ->
Conf#{
remote => parse_remote(RC, BridgeName),
local => emqx_connector_mqtt_msg:parse(LC)
local => emqx_bridge_mqtt_msg:parse(LC)
}.
parse_remote(#{qos := QoSIn} = Conf, BridgeName) ->
@ -261,7 +261,7 @@ to_broker_msg(#{dup := Dup} = Msg, Local, Props) ->
payload := Payload,
qos := QoS,
retain := Retain
} = emqx_connector_mqtt_msg:render(Msg, Local),
} = emqx_bridge_mqtt_msg:render(Msg, Local),
PubProps = maps:get(pub_props, Msg, #{}),
emqx_message:set_headers(
Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt_msg).
-module(emqx_bridge_mqtt_msg).
-export([parse/1]).
-export([render/2]).

View File

@ -42,7 +42,7 @@ fields("config") ->
}
)}
] ++
emqx_connector_mqtt_schema:fields("config");
emqx_bridge_mqtt_connector_schema:fields("config");
fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];

View File

@ -130,13 +130,11 @@ suite() ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps(
[
emqx_rule_engine,
emqx_bridge,
emqx_bridge_mqtt,
emqx_dashboard
],
fun set_special_configs/1
@ -150,9 +148,10 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([
emqx_rule_engine,
emqx_dashboard,
emqx_bridge_mqtt,
emqx_bridge,
emqx_dashboard
emqx_rule_engine
]),
ok.
@ -307,7 +306,7 @@ t_mqtt_egress_bridge_ignores_clean_start(_) ->
emqx_resource_manager:lookup_cached(ResourceID),
ClientInfo = ecpool:pick_and_do(
EgressPoolName,
{emqx_connector_mqtt_egress, info, []},
{emqx_bridge_mqtt_egress, info, []},
no_handover
),
?assertMatch(

View File

@ -372,6 +372,7 @@ defmodule EMQXUmbrella.MixProject do
emqx_gateway_exproto: :permanent,
emqx_exhook: :permanent,
emqx_bridge: :permanent,
emqx_bridge_mqtt: :permanent,
emqx_rule_engine: :permanent,
emqx_modules: :permanent,
emqx_management: :permanent,

View File

@ -426,6 +426,7 @@ relx_apps(ReleaseType, Edition) ->
emqx_gateway_exproto,
emqx_exhook,
emqx_bridge,
emqx_bridge_mqtt,
emqx_rule_engine,
emqx_modules,
emqx_management,

View File

@ -1,4 +1,4 @@
emqx_connector_mqtt_schema {
emqx_bridge_mqtt_connector_schema {
bridge_mode.desc:
"""If enable bridge mode.