From bfb2df37ce7cf7f03b955817432fe4e206ef480a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 6 Sep 2021 17:32:46 +0800 Subject: [PATCH] refactor(bridge): rename emqx_data_bridge to emqx_bridge --- .../.gitignore | 0 .../README.md | 2 +- .../etc/emqx_bridge.conf} | 49 ++++++++++++++++--- .../rebar.config | 2 +- .../src/emqx_bridge.app.src} | 4 +- .../src/emqx_bridge.erl} | 12 +++-- .../src/emqx_bridge_api.erl} | 28 +++++------ .../src/emqx_bridge_app.erl} | 8 +-- .../src/emqx_bridge_monitor.erl} | 14 ++++-- apps/emqx_bridge/src/emqx_bridge_schema.erl | 17 +++++++ .../src/emqx_bridge_sup.erl} | 8 +-- apps/emqx_machine/src/emqx_machine.erl | 2 +- apps/emqx_machine/src/emqx_machine_schema.erl | 2 +- apps/emqx_resource/README.md | 2 +- apps/emqx_resource/examples/demo.md | 6 +-- rebar.config.erl | 2 +- 16 files changed, 107 insertions(+), 51 deletions(-) rename apps/{emqx_data_bridge => emqx_bridge}/.gitignore (100%) rename apps/{emqx_data_bridge => emqx_bridge}/README.md (95%) rename apps/{emqx_data_bridge/etc/emqx_data_bridge.conf => emqx_bridge/etc/emqx_bridge.conf} (78%) rename apps/{emqx_data_bridge => emqx_bridge}/rebar.config (73%) rename apps/{emqx_data_bridge/src/emqx_data_bridge.app.src => emqx_bridge/src/emqx_bridge.app.src} (75%) rename apps/{emqx_data_bridge/src/emqx_data_bridge.erl => emqx_bridge/src/emqx_bridge.erl} (86%) rename apps/{emqx_data_bridge/src/emqx_data_bridge_api.erl => emqx_bridge/src/emqx_bridge_api.erl} (83%) rename apps/{emqx_data_bridge/src/emqx_data_bridge_app.erl => emqx_bridge/src/emqx_bridge_app.erl} (87%) rename apps/{emqx_data_bridge/src/emqx_data_bridge_monitor.erl => emqx_bridge/src/emqx_bridge_monitor.erl} (84%) create mode 100644 apps/emqx_bridge/src/emqx_bridge_schema.erl rename apps/{emqx_data_bridge/src/emqx_data_bridge_sup.erl => emqx_bridge/src/emqx_bridge_sup.erl} (86%) diff --git a/apps/emqx_data_bridge/.gitignore b/apps/emqx_bridge/.gitignore similarity index 100% rename from apps/emqx_data_bridge/.gitignore rename to apps/emqx_bridge/.gitignore diff --git a/apps/emqx_data_bridge/README.md b/apps/emqx_bridge/README.md similarity index 95% rename from apps/emqx_data_bridge/README.md rename to apps/emqx_bridge/README.md index 8f76f17a5..0f274eea1 100644 --- a/apps/emqx_data_bridge/README.md +++ b/apps/emqx_bridge/README.md @@ -1,4 +1,4 @@ -# emqx_data_bridge +# emqx_bridge EMQ X Data Bridge is an application that managing the resources (see emqx_resource) used by emqx rule engine. diff --git a/apps/emqx_data_bridge/etc/emqx_data_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf similarity index 78% rename from apps/emqx_data_bridge/etc/emqx_data_bridge.conf rename to apps/emqx_bridge/etc/emqx_bridge.conf index 99a49dba3..663ae6586 100644 --- a/apps/emqx_data_bridge/etc/emqx_data_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -1,10 +1,46 @@ ##-------------------------------------------------------------------- -## EMQ X Bridge Plugin +## EMQ X Bridge ##-------------------------------------------------------------------- -emqx_data_bridge { - bridges:[ - # {name: "mysql_bridge_1" +bridges.mqtt.my_mqtt_bridge { + server = "127.0.0.1:1883" + proto_ver = "v4" + clientid = "client1" + username = "username1" + password = "" + clean_start = true + keepalive = 300 + retry_interval = "30s" + max_inflight = 32 + reconnect_interval = "30s" + bridge_mode = true + replayq { + dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" + seg_bytes = "100MB" + offload = false + max_total_bytes = "1GB" + } + ssl { + enable = false + keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" + certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" + cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" + } + in [{ + from_remote_topic = "msg/#" + to_local_topic = "from_aws/${topic}" + payload_template = "${message}" + qos = 1 + }] + out [{ + from_local_topic = "msg/#" + to_remote_topic = "from_emqx/${topic}" + payload_template = "${message}" + }] +} + + +# {name: "mysql_bridge_1" # type: mysql # config: { # server: "192.168.0.172:3306" @@ -123,7 +159,4 @@ emqx_data_bridge { # pool_size: 1 # ssl: false # } - # } - - ] -} + # } \ No newline at end of file diff --git a/apps/emqx_data_bridge/rebar.config b/apps/emqx_bridge/rebar.config similarity index 73% rename from apps/emqx_data_bridge/rebar.config rename to apps/emqx_bridge/rebar.config index cf4cfcf1b..3fd6b41e0 100644 --- a/apps/emqx_data_bridge/rebar.config +++ b/apps/emqx_bridge/rebar.config @@ -3,5 +3,5 @@ {shell, [ % {config, "config/sys.config"}, - {apps, [emqx_data_bridge]} + {apps, [emqx_bridge]} ]}. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src similarity index 75% rename from apps/emqx_data_bridge/src/emqx_data_bridge.app.src rename to apps/emqx_bridge/src/emqx_bridge.app.src index 84486da19..42fc245f5 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,8 +1,8 @@ -{application, emqx_data_bridge, +{application, emqx_bridge, [{description, "An OTP application"}, {vsn, "0.1.0"}, {registered, []}, - {mod, {emqx_data_bridge_app, []}}, + {mod, {emqx_bridge_app, []}}, {applications, [kernel, stdlib, diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl similarity index 86% rename from apps/emqx_data_bridge/src/emqx_data_bridge.erl rename to apps/emqx_bridge/src/emqx_bridge.erl index 52cea80fb..4e05f8e96 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_data_bridge). +-module(emqx_bridge). -export([ load_bridges/0 , resource_type/1 @@ -27,15 +27,17 @@ ]). load_bridges() -> - Bridges = emqx:get_config([emqx_data_bridge, bridges], []), - emqx_data_bridge_monitor:ensure_all_started(Bridges). + Bridges = emqx:get_config([bridges], #{}), + emqx_bridge_monitor:ensure_all_started(Bridges). +resource_type(mqtt) -> emqx_connector_mqtt; resource_type(mysql) -> emqx_connector_mysql; resource_type(pgsql) -> emqx_connector_pgsql; resource_type(mongo) -> emqx_connector_mongo; resource_type(redis) -> emqx_connector_redis; resource_type(ldap) -> emqx_connector_ldap. +bridge_type(emqx_connector_mqtt) -> mqtt; bridge_type(emqx_connector_mysql) -> mysql; bridge_type(emqx_connector_pgsql) -> pgsql; bridge_type(emqx_connector_mongo) -> mongo; @@ -49,7 +51,7 @@ resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) -> BridgeName. list_bridges() -> - emqx_resource_api:list_instances(fun emqx_data_bridge:is_bridge/1). + emqx_resource_api:list_instances(fun emqx_bridge:is_bridge/1). is_bridge(#{id := <<"bridge:", _/binary>>}) -> true; @@ -57,7 +59,7 @@ is_bridge(_Data) -> false. config_key_path() -> - [emqx_data_bridge, bridges]. + [emqx_bridge, bridges]. update_config(ConfigReq) -> emqx:update_config(config_key_path(), ConfigReq). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl similarity index 83% rename from apps/emqx_data_bridge/src/emqx_data_bridge_api.erl rename to apps/emqx_bridge/src/emqx_bridge_api.erl index 6fe75e4ce..c10875e55 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_data_bridge_api). +-module(emqx_bridge_api). -rest_api(#{ name => list_data_bridges , method => 'GET' @@ -61,10 +61,10 @@ list_bridges(_Binding, _Params) -> {200, #{code => 0, data => [format_api_reply(Data) || - Data <- emqx_data_bridge:list_bridges()]}}. + Data <- emqx_bridge:list_bridges()]}}. get_bridge(#{name := Name}, _Params) -> - case emqx_resource:get_instance(emqx_data_bridge:name_to_resource_id(Name)) of + case emqx_resource:get_instance(emqx_bridge:name_to_resource_id(Name)) of {ok, Data} -> {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; {error, not_found} -> @@ -75,8 +75,8 @@ create_bridge(#{name := Name}, Params) -> Config = proplists:get_value(<<"config">>, Params), BridgeType = proplists:get_value(<<"type">>, Params), case emqx_resource:check_and_create( - emqx_data_bridge:name_to_resource_id(Name), - emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of + emqx_bridge:name_to_resource_id(Name), + emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of {ok, already_created} -> {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {ok, Data} -> @@ -91,8 +91,8 @@ update_bridge(#{name := Name}, Params) -> Config = proplists:get_value(<<"config">>, Params), BridgeType = proplists:get_value(<<"type">>, Params), case emqx_resource:check_and_update( - emqx_data_bridge:name_to_resource_id(Name), - emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of + emqx_bridge:name_to_resource_id(Name), + emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of {ok, Data} -> update_config_and_reply(Name, BridgeType, Config, Data); {error, not_found} -> @@ -104,26 +104,26 @@ update_bridge(#{name := Name}, Params) -> end. delete_bridge(#{name := Name}, _Params) -> - case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of + case emqx_resource:remove(emqx_bridge:name_to_resource_id(Name)) of ok -> delete_config_and_reply(Name); {error, Reason} -> {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} end. format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := Status}) -> - #{type => emqx_data_bridge:bridge_type(Type), - name => emqx_data_bridge:resource_id_to_name(Id), + #{type => emqx_bridge:bridge_type(Type), + name => emqx_bridge:resource_id_to_name(Id), config => Conf, status => Status}. % format_conf(#{resource_type := Type, id := Id, config := Conf}) -> -% #{type => Type, name => emqx_data_bridge:resource_id_to_name(Id), +% #{type => Type, name => emqx_bridge:resource_id_to_name(Id), % config => Conf}. % get_all_configs() -> -% [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()]. +% [format_conf(Data) || Data <- emqx_bridge:list_bridges()]. update_config_and_reply(Name, BridgeType, Config, Data) -> - case emqx_data_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of + case emqx_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of {ok, _} -> {200, #{code => 0, data => format_api_reply( emqx_resource_api:format_data(Data))}}; @@ -132,7 +132,7 @@ update_config_and_reply(Name, BridgeType, Config, Data) -> end. delete_config_and_reply(Name) -> - case emqx_data_bridge:update_config({delete, Name}) of + case emqx_bridge:update_config({delete, Name}) of {ok, _} -> {200, #{code => 0, data => #{}}}; {error, Reason} -> {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl similarity index 87% rename from apps/emqx_data_bridge/src/emqx_data_bridge_app.erl rename to apps/emqx_bridge/src/emqx_bridge_app.erl index 859952480..cfefe118f 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_data_bridge_app). +-module(emqx_bridge_app). -behaviour(application). @@ -22,9 +22,9 @@ -export([start/2, stop/1, pre_config_update/2]). start(_StartType, _StartArgs) -> - {ok, Sup} = emqx_data_bridge_sup:start_link(), - ok = emqx_data_bridge:load_bridges(), - emqx_config_handler:add_handler(emqx_data_bridge:config_key_path(), ?MODULE), + {ok, Sup} = emqx_bridge_sup:start_link(), + ok = emqx_bridge:load_bridges(), + emqx_config_handler:add_handler(emqx_bridge:config_key_path(), ?MODULE), {ok, Sup}. stop(_State) -> diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl similarity index 84% rename from apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl rename to apps/emqx_bridge/src/emqx_bridge_monitor.erl index 4917833ec..4b3695615 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- %% This process monitors all the data bridges, and try to restart a bridge %% when one of it stopped. --module(emqx_data_bridge_monitor). +-module(emqx_bridge_monitor). -behaviour(gen_server). @@ -65,14 +65,18 @@ code_change(_OldVsn, State, _Extra) -> %%============================================================================ load_bridges(Configs) -> - lists:foreach(fun load_bridge/1, Configs). + lists:foreach(fun(Type, NamedConf) -> + lists:foreach(fun(Name, Conf) -> + load_bridge(Name, Type, Conf) + end, maps:to_list(NamedConf)) + end, maps:to_list(Configs)). %% TODO: move this monitor into emqx_resource %% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}). -load_bridge(#{name := Name, type := Type, config := Config}) -> +load_bridge(Name, Type, Config) -> case emqx_resource:create_local( - emqx_data_bridge:name_to_resource_id(Name), - emqx_data_bridge:resource_type(Type), Config) of + emqx_bridge:name_to_resource_id(Name), + emqx_bridge:resource_type(Type), Config) of {ok, already_created} -> ok; {ok, _} -> ok; {error, Reason} -> diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl new file mode 100644 index 000000000..f651ce189 --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -0,0 +1,17 @@ +-module(emqx_bridge_schema). + +-export([roots/0, fields/1]). + +%%====================================================================================== +%% Hocon Schema Definitions + +roots() -> ["bridges"]. + +fields("bridges") -> + [{mqtt, hoconsc:ref("mqtt")}]; + +fields("mqtt") -> + [{"?name"}, hoconsc:ref("mqtt_briage")]; + +fields("mqtt_briage") -> + emqx_connector_mqtt:fields("config"). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl b/apps/emqx_bridge/src/emqx_bridge_sup.erl similarity index 86% rename from apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl rename to apps/emqx_bridge/src/emqx_bridge_sup.erl index a699a72a0..fd12b1a99 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl +++ b/apps/emqx_bridge/src/emqx_bridge_sup.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_data_bridge_sup). +-module(emqx_bridge_sup). -behaviour(supervisor). @@ -31,11 +31,11 @@ init([]) -> intensity => 10, period => 10}, ChildSpecs = [ - #{id => emqx_data_bridge_monitor, - start => {emqx_data_bridge_monitor, start_link, []}, + #{id => emqx_bridge_monitor, + start => {emqx_bridge_monitor, start_link, []}, restart => permanent, type => worker, - modules => [emqx_data_bridge_monitor]} + modules => [emqx_bridge_monitor]} ], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 3e5772b4a..97125d79f 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -140,7 +140,7 @@ reboot_apps() -> , emqx_statsd , emqx_resource , emqx_rule_engine - , emqx_data_bridge + , emqx_bridge , emqx_bridge_mqtt , emqx_plugin_libs , emqx_management diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index 4894bda98..1f5d639cd 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -43,7 +43,7 @@ %% by nodetool to generate app.