diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index fd67c622c..1a3edb484 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -58,14 +58,14 @@ ). -if(?EMQX_RELEASE_EDITION == ee). -bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; -bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; +bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; +bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_connector_http; bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType). -else. -bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; -bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; +bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; +bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_connector_http. -endif. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src new file mode 100644 index 000000000..54d7ffbed --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -0,0 +1,18 @@ +%% -*- mode: erlang -*- +{application, emqx_bridge_mqtt, [ + {description, "EMQX MQTT Broker Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx, + emqx_resource, + emqx_bridge, + emqtt + ]}, + {env, []}, + {modules, []}, + {licenses, ["Apache 2.0"]}, + {links, []} +]}. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl similarity index 95% rename from apps/emqx_connector/src/emqx_connector_mqtt.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 255247011..13be52a0c 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_connector_mqtt). +-module(emqx_bridge_mqtt_connector). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -83,7 +83,7 @@ start_ingress(ResourceId, Ingress, ClientOpts) -> {ingress, Ingress}, {client_opts, ClientOpts} ], - case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_ingress, Options) of + case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of ok -> {ok, #{ingress_pool_name => PoolName}}; {error, {start_pool_failed, _, Reason}} -> @@ -128,11 +128,11 @@ start_egress(ResourceId, Egress, ClientOpts) -> {pool_size, PoolSize}, {client_opts, ClientOpts} ], - case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_egress, Options) of + case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of ok -> {ok, #{ egress_pool_name => PoolName, - egress_config => emqx_connector_mqtt_egress:config(Egress) + egress_config => emqx_bridge_mqtt_egress:config(Egress) }}; {error, {start_pool_failed, _, Reason}} -> {error, Reason} @@ -197,7 +197,7 @@ on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) -> }). with_egress_client(ResourceId, Fun, Args) -> - ecpool:pick_and_do(ResourceId, {emqx_connector_mqtt_egress, Fun, Args}, no_handover). + ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover). on_async_result(Callback, Result) -> apply_callback_function(Callback, handle_send_result(Result)). @@ -250,9 +250,9 @@ on_get_status(_ResourceId, State) -> get_status({Pool, Worker}) -> case ecpool_worker:client(Worker) of {ok, Client} when Pool == ingress_pool_name -> - emqx_connector_mqtt_ingress:status(Client); + emqx_bridge_mqtt_ingress:status(Client); {ok, Client} when Pool == egress_pool_name -> - emqx_connector_mqtt_egress:status(Client); + emqx_bridge_mqtt_egress:status(Client); {error, _} -> disconnected end. @@ -300,7 +300,7 @@ mk_client_opts( ssl := #{enable := EnableSsl} = Ssl } ) -> - HostPort = emqx_connector_mqtt_schema:parse_server(Server), + HostPort = emqx_bridge_mqtt_connector_schema:parse_server(Server), Options = maps:with( [ proto_ver, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl similarity index 95% rename from apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl index 6d06029b9..1dc3ca5f8 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_connector_mqtt_schema). +-module(emqx_bridge_mqtt_connector_schema). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -138,13 +138,13 @@ fields("ingress") -> {remote, mk( ref(?MODULE, "ingress_remote"), - #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")} + #{desc => ?DESC("ingress_remote")} )}, {local, mk( ref(?MODULE, "ingress_local"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local") + desc => ?DESC("ingress_local") } )} ]; @@ -211,7 +211,7 @@ fields("egress") -> mk( ref(?MODULE, "egress_local"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "egress_local"), + desc => ?DESC("egress_local"), required => false } )}, @@ -219,7 +219,7 @@ fields("egress") -> mk( ref(?MODULE, "egress_remote"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote"), + desc => ?DESC("egress_remote"), required => true } )} diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl similarity index 94% rename from apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index 0e413cbc9..673a30726 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_connector_mqtt_egress). +-module(emqx_bridge_mqtt_egress). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx.hrl"). @@ -51,7 +51,7 @@ local => #{ topic => emqx_topic:topic() }, - remote := emqx_connector_mqtt_msg:msgvars() + remote := emqx_bridge_mqtt_msg:msgvars() }. %% @doc Start an ingress bridge worker. @@ -102,7 +102,7 @@ connect(Pid, Name) -> -spec config(map()) -> egress(). config(#{remote := RC = #{}} = Conf) -> - Conf#{remote => emqx_connector_mqtt_msg:parse(RC)}. + Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. -spec send(pid(), message(), egress()) -> ok. @@ -118,7 +118,7 @@ send_async(Pid, MsgIn, Callback, Egress) -> export_msg(Msg, #{remote := Remote}) -> to_remote_msg(Msg, Remote). --spec to_remote_msg(message(), emqx_connector_mqtt_msg:msgvars()) -> +-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) -> remote_message(). to_remote_msg(#message{flags = Flags} = Msg, Vars) -> {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), @@ -129,7 +129,7 @@ to_remote_msg(Msg = #{}, Remote) -> payload := Payload, qos := QoS, retain := Retain - } = emqx_connector_mqtt_msg:render(Msg, Remote), + } = emqx_bridge_mqtt_msg:render(Msg, Remote), PubProps = maps:get(pub_props, Msg, #{}), #mqtt_msg{ qos = QoS, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl similarity index 97% rename from apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index c11895c49..78bbf7753 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_connector_mqtt_ingress). +-module(emqx_bridge_mqtt_ingress). -include_lib("emqx/include/logger.hrl"). @@ -46,7 +46,7 @@ topic := emqx_topic:topic(), qos => emqx_types:qos() }, - local := emqx_connector_mqtt_msg:msgvars(), + local := emqx_bridge_mqtt_msg:msgvars(), on_message_received := {module(), atom(), [term()]} }. @@ -135,7 +135,7 @@ subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> config(#{remote := RC, local := LC} = Conf, BridgeName) -> Conf#{ remote => parse_remote(RC, BridgeName), - local => emqx_connector_mqtt_msg:parse(LC) + local => emqx_bridge_mqtt_msg:parse(LC) }. parse_remote(#{qos := QoSIn} = Conf, BridgeName) -> @@ -261,7 +261,7 @@ to_broker_msg(#{dup := Dup} = Msg, Local, Props) -> payload := Payload, qos := QoS, retain := Retain - } = emqx_connector_mqtt_msg:render(Msg, Local), + } = emqx_bridge_mqtt_msg:render(Msg, Local), PubProps = maps:get(pub_props, Msg, #{}), emqx_message:set_headers( Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl similarity index 98% rename from apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl index b57d69df6..8a8cffe55 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_connector_mqtt_msg). +-module(emqx_bridge_mqtt_msg). -export([parse/1]). -export([render/2]). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl similarity index 97% rename from apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl index 5cd1693c7..a312dfaa9 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl @@ -42,7 +42,7 @@ fields("config") -> } )} ] ++ - emqx_connector_mqtt_schema:fields("config"); + emqx_bridge_mqtt_connector_schema:fields("config"); fields("creation_opts") -> Opts = emqx_resource_schema:fields("creation_opts"), [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl similarity index 99% rename from apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl rename to apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 6c36e08e7..81f9a3573 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -130,13 +130,11 @@ suite() -> init_per_suite(Config) -> _ = application:load(emqx_conf), - %% some testcases (may from other app) already get emqx_connector started - _ = application:stop(emqx_resource), - _ = application:stop(emqx_connector), ok = emqx_common_test_helpers:start_apps( [ emqx_rule_engine, emqx_bridge, + emqx_bridge_mqtt, emqx_dashboard ], fun set_special_configs/1 @@ -150,9 +148,10 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([ - emqx_rule_engine, + emqx_dashboard, + emqx_bridge_mqtt, emqx_bridge, - emqx_dashboard + emqx_rule_engine ]), ok. @@ -307,7 +306,7 @@ t_mqtt_egress_bridge_ignores_clean_start(_) -> emqx_resource_manager:lookup_cached(ResourceID), ClientInfo = ecpool:pick_and_do( EgressPoolName, - {emqx_connector_mqtt_egress, info, []}, + {emqx_bridge_mqtt_egress, info, []}, no_handover ), ?assertMatch( diff --git a/mix.exs b/mix.exs index 3fa3cc2bc..7983a7482 100644 --- a/mix.exs +++ b/mix.exs @@ -372,6 +372,7 @@ defmodule EMQXUmbrella.MixProject do emqx_gateway_exproto: :permanent, emqx_exhook: :permanent, emqx_bridge: :permanent, + emqx_bridge_mqtt: :permanent, emqx_rule_engine: :permanent, emqx_modules: :permanent, emqx_management: :permanent, diff --git a/rebar.config.erl b/rebar.config.erl index a8474a703..6c54dffca 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -426,6 +426,7 @@ relx_apps(ReleaseType, Edition) -> emqx_gateway_exproto, emqx_exhook, emqx_bridge, + emqx_bridge_mqtt, emqx_rule_engine, emqx_modules, emqx_management, diff --git a/rel/i18n/emqx_connector_mqtt_schema.hocon b/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon similarity index 99% rename from rel/i18n/emqx_connector_mqtt_schema.hocon rename to rel/i18n/emqx_bridge_mqtt_connector_schema.hocon index 509fc4209..ed7a59dcd 100644 --- a/rel/i18n/emqx_connector_mqtt_schema.hocon +++ b/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon @@ -1,4 +1,4 @@ -emqx_connector_mqtt_schema { +emqx_bridge_mqtt_connector_schema { bridge_mode.desc: """If enable bridge mode.