refactor(bridge): rename emqx_data_bridge to emqx_bridge

This commit is contained in:
Shawn 2021-09-06 17:32:46 +08:00
parent 5693981b54
commit bfb2df37ce
16 changed files with 107 additions and 51 deletions

View File

@ -1,4 +1,4 @@
# emqx_data_bridge # emqx_bridge
EMQ X Data Bridge is an application that managing the resources (see emqx_resource) used by emqx EMQ X Data Bridge is an application that managing the resources (see emqx_resource) used by emqx
rule engine. rule engine.

View File

@ -1,10 +1,46 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## EMQ X Bridge Plugin ## EMQ X Bridge
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
emqx_data_bridge { bridges.mqtt.my_mqtt_bridge {
bridges:[ server = "127.0.0.1:1883"
# {name: "mysql_bridge_1" proto_ver = "v4"
clientid = "client1"
username = "username1"
password = ""
clean_start = true
keepalive = 300
retry_interval = "30s"
max_inflight = 32
reconnect_interval = "30s"
bridge_mode = true
replayq {
dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
seg_bytes = "100MB"
offload = false
max_total_bytes = "1GB"
}
ssl {
enable = false
keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
}
in [{
from_remote_topic = "msg/#"
to_local_topic = "from_aws/${topic}"
payload_template = "${message}"
qos = 1
}]
out [{
from_local_topic = "msg/#"
to_remote_topic = "from_emqx/${topic}"
payload_template = "${message}"
}]
}
# {name: "mysql_bridge_1"
# type: mysql # type: mysql
# config: { # config: {
# server: "192.168.0.172:3306" # server: "192.168.0.172:3306"
@ -123,7 +159,4 @@ emqx_data_bridge {
# pool_size: 1 # pool_size: 1
# ssl: false # ssl: false
# } # }
# } # }
]
}

View File

@ -3,5 +3,5 @@
{shell, [ {shell, [
% {config, "config/sys.config"}, % {config, "config/sys.config"},
{apps, [emqx_data_bridge]} {apps, [emqx_bridge]}
]}. ]}.

View File

@ -1,8 +1,8 @@
{application, emqx_data_bridge, {application, emqx_bridge,
[{description, "An OTP application"}, [{description, "An OTP application"},
{vsn, "0.1.0"}, {vsn, "0.1.0"},
{registered, []}, {registered, []},
{mod, {emqx_data_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, {applications,
[kernel, [kernel,
stdlib, stdlib,

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_data_bridge). -module(emqx_bridge).
-export([ load_bridges/0 -export([ load_bridges/0
, resource_type/1 , resource_type/1
@ -27,15 +27,17 @@
]). ]).
load_bridges() -> load_bridges() ->
Bridges = emqx:get_config([emqx_data_bridge, bridges], []), Bridges = emqx:get_config([bridges], #{}),
emqx_data_bridge_monitor:ensure_all_started(Bridges). emqx_bridge_monitor:ensure_all_started(Bridges).
resource_type(mqtt) -> emqx_connector_mqtt;
resource_type(mysql) -> emqx_connector_mysql; resource_type(mysql) -> emqx_connector_mysql;
resource_type(pgsql) -> emqx_connector_pgsql; resource_type(pgsql) -> emqx_connector_pgsql;
resource_type(mongo) -> emqx_connector_mongo; resource_type(mongo) -> emqx_connector_mongo;
resource_type(redis) -> emqx_connector_redis; resource_type(redis) -> emqx_connector_redis;
resource_type(ldap) -> emqx_connector_ldap. resource_type(ldap) -> emqx_connector_ldap.
bridge_type(emqx_connector_mqtt) -> mqtt;
bridge_type(emqx_connector_mysql) -> mysql; bridge_type(emqx_connector_mysql) -> mysql;
bridge_type(emqx_connector_pgsql) -> pgsql; bridge_type(emqx_connector_pgsql) -> pgsql;
bridge_type(emqx_connector_mongo) -> mongo; bridge_type(emqx_connector_mongo) -> mongo;
@ -49,7 +51,7 @@ resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) ->
BridgeName. BridgeName.
list_bridges() -> list_bridges() ->
emqx_resource_api:list_instances(fun emqx_data_bridge:is_bridge/1). emqx_resource_api:list_instances(fun emqx_bridge:is_bridge/1).
is_bridge(#{id := <<"bridge:", _/binary>>}) -> is_bridge(#{id := <<"bridge:", _/binary>>}) ->
true; true;
@ -57,7 +59,7 @@ is_bridge(_Data) ->
false. false.
config_key_path() -> config_key_path() ->
[emqx_data_bridge, bridges]. [emqx_bridge, bridges].
update_config(ConfigReq) -> update_config(ConfigReq) ->
emqx:update_config(config_key_path(), ConfigReq). emqx:update_config(config_key_path(), ConfigReq).

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_data_bridge_api). -module(emqx_bridge_api).
-rest_api(#{ name => list_data_bridges -rest_api(#{ name => list_data_bridges
, method => 'GET' , method => 'GET'
@ -61,10 +61,10 @@
list_bridges(_Binding, _Params) -> list_bridges(_Binding, _Params) ->
{200, #{code => 0, data => [format_api_reply(Data) || {200, #{code => 0, data => [format_api_reply(Data) ||
Data <- emqx_data_bridge:list_bridges()]}}. Data <- emqx_bridge:list_bridges()]}}.
get_bridge(#{name := Name}, _Params) -> get_bridge(#{name := Name}, _Params) ->
case emqx_resource:get_instance(emqx_data_bridge:name_to_resource_id(Name)) of case emqx_resource:get_instance(emqx_bridge:name_to_resource_id(Name)) of
{ok, Data} -> {ok, Data} ->
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
{error, not_found} -> {error, not_found} ->
@ -75,8 +75,8 @@ create_bridge(#{name := Name}, Params) ->
Config = proplists:get_value(<<"config">>, Params), Config = proplists:get_value(<<"config">>, Params),
BridgeType = proplists:get_value(<<"type">>, Params), BridgeType = proplists:get_value(<<"type">>, Params),
case emqx_resource:check_and_create( case emqx_resource:check_and_create(
emqx_data_bridge:name_to_resource_id(Name), emqx_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of
{ok, already_created} -> {ok, already_created} ->
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
{ok, Data} -> {ok, Data} ->
@ -91,8 +91,8 @@ update_bridge(#{name := Name}, Params) ->
Config = proplists:get_value(<<"config">>, Params), Config = proplists:get_value(<<"config">>, Params),
BridgeType = proplists:get_value(<<"type">>, Params), BridgeType = proplists:get_value(<<"type">>, Params),
case emqx_resource:check_and_update( case emqx_resource:check_and_update(
emqx_data_bridge:name_to_resource_id(Name), emqx_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of
{ok, Data} -> {ok, Data} ->
update_config_and_reply(Name, BridgeType, Config, Data); update_config_and_reply(Name, BridgeType, Config, Data);
{error, not_found} -> {error, not_found} ->
@ -104,26 +104,26 @@ update_bridge(#{name := Name}, Params) ->
end. end.
delete_bridge(#{name := Name}, _Params) -> delete_bridge(#{name := Name}, _Params) ->
case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of case emqx_resource:remove(emqx_bridge:name_to_resource_id(Name)) of
ok -> delete_config_and_reply(Name); ok -> delete_config_and_reply(Name);
{error, Reason} -> {error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end. end.
format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := Status}) -> format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := Status}) ->
#{type => emqx_data_bridge:bridge_type(Type), #{type => emqx_bridge:bridge_type(Type),
name => emqx_data_bridge:resource_id_to_name(Id), name => emqx_bridge:resource_id_to_name(Id),
config => Conf, status => Status}. config => Conf, status => Status}.
% format_conf(#{resource_type := Type, id := Id, config := Conf}) -> % format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
% #{type => Type, name => emqx_data_bridge:resource_id_to_name(Id), % #{type => Type, name => emqx_bridge:resource_id_to_name(Id),
% config => Conf}. % config => Conf}.
% get_all_configs() -> % get_all_configs() ->
% [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()]. % [format_conf(Data) || Data <- emqx_bridge:list_bridges()].
update_config_and_reply(Name, BridgeType, Config, Data) -> update_config_and_reply(Name, BridgeType, Config, Data) ->
case emqx_data_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of case emqx_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of
{ok, _} -> {ok, _} ->
{200, #{code => 0, data => format_api_reply( {200, #{code => 0, data => format_api_reply(
emqx_resource_api:format_data(Data))}}; emqx_resource_api:format_data(Data))}};
@ -132,7 +132,7 @@ update_config_and_reply(Name, BridgeType, Config, Data) ->
end. end.
delete_config_and_reply(Name) -> delete_config_and_reply(Name) ->
case emqx_data_bridge:update_config({delete, Name}) of case emqx_bridge:update_config({delete, Name}) of
{ok, _} -> {200, #{code => 0, data => #{}}}; {ok, _} -> {200, #{code => 0, data => #{}}};
{error, Reason} -> {error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_data_bridge_app). -module(emqx_bridge_app).
-behaviour(application). -behaviour(application).
@ -22,9 +22,9 @@
-export([start/2, stop/1, pre_config_update/2]). -export([start/2, stop/1, pre_config_update/2]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_data_bridge_sup:start_link(), {ok, Sup} = emqx_bridge_sup:start_link(),
ok = emqx_data_bridge:load_bridges(), ok = emqx_bridge:load_bridges(),
emqx_config_handler:add_handler(emqx_data_bridge:config_key_path(), ?MODULE), emqx_config_handler:add_handler(emqx_bridge:config_key_path(), ?MODULE),
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->

View File

@ -15,7 +15,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% This process monitors all the data bridges, and try to restart a bridge %% This process monitors all the data bridges, and try to restart a bridge
%% when one of it stopped. %% when one of it stopped.
-module(emqx_data_bridge_monitor). -module(emqx_bridge_monitor).
-behaviour(gen_server). -behaviour(gen_server).
@ -65,14 +65,18 @@ code_change(_OldVsn, State, _Extra) ->
%%============================================================================ %%============================================================================
load_bridges(Configs) -> load_bridges(Configs) ->
lists:foreach(fun load_bridge/1, Configs). lists:foreach(fun(Type, NamedConf) ->
lists:foreach(fun(Name, Conf) ->
load_bridge(Name, Type, Conf)
end, maps:to_list(NamedConf))
end, maps:to_list(Configs)).
%% TODO: move this monitor into emqx_resource %% TODO: move this monitor into emqx_resource
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}). %% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
load_bridge(#{name := Name, type := Type, config := Config}) -> load_bridge(Name, Type, Config) ->
case emqx_resource:create_local( case emqx_resource:create_local(
emqx_data_bridge:name_to_resource_id(Name), emqx_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(Type), Config) of emqx_bridge:resource_type(Type), Config) of
{ok, already_created} -> ok; {ok, already_created} -> ok;
{ok, _} -> ok; {ok, _} -> ok;
{error, Reason} -> {error, Reason} ->

View File

@ -0,0 +1,17 @@
-module(emqx_bridge_schema).
-export([roots/0, fields/1]).
%%======================================================================================
%% Hocon Schema Definitions
roots() -> ["bridges"].
fields("bridges") ->
[{mqtt, hoconsc:ref("mqtt")}];
fields("mqtt") ->
[{"?name"}, hoconsc:ref("mqtt_briage")];
fields("mqtt_briage") ->
emqx_connector_mqtt:fields("config").

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_data_bridge_sup). -module(emqx_bridge_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -31,11 +31,11 @@ init([]) ->
intensity => 10, intensity => 10,
period => 10}, period => 10},
ChildSpecs = [ ChildSpecs = [
#{id => emqx_data_bridge_monitor, #{id => emqx_bridge_monitor,
start => {emqx_data_bridge_monitor, start_link, []}, start => {emqx_bridge_monitor, start_link, []},
restart => permanent, restart => permanent,
type => worker, type => worker,
modules => [emqx_data_bridge_monitor]} modules => [emqx_bridge_monitor]}
], ],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.

View File

@ -140,7 +140,7 @@ reboot_apps() ->
, emqx_statsd , emqx_statsd
, emqx_resource , emqx_resource
, emqx_rule_engine , emqx_rule_engine
, emqx_data_bridge , emqx_bridge
, emqx_bridge_mqtt , emqx_bridge_mqtt
, emqx_plugin_libs , emqx_plugin_libs
, emqx_management , emqx_management

View File

@ -43,7 +43,7 @@
%% by nodetool to generate app.<time>.config before EMQ X is started %% by nodetool to generate app.<time>.config before EMQ X is started
-define(MERGED_CONFIGS, -define(MERGED_CONFIGS,
[ emqx_schema [ emqx_schema
, emqx_data_bridge_schema , emqx_bridge_schema
, emqx_retainer_schema , emqx_retainer_schema
, emqx_statsd_schema , emqx_statsd_schema
, emqx_authz_schema , emqx_authz_schema

View File

@ -3,7 +3,7 @@
The `emqx_resource` is a behavior that manages configuration specs and runtime states The `emqx_resource` is a behavior that manages configuration specs and runtime states
for resources like mysql or redis backends. for resources like mysql or redis backends.
It is intended to be used by the emqx_data_bridges and all other resources that need CRUD operations It is intended to be used by the emqx_bridges and all other resources that need CRUD operations
to their configs, and need to initialize the states when creating. to their configs, and need to initialize the states when creating.
There can be foreign references between resource instances via resource-id. There can be foreign references between resource instances via resource-id.

View File

@ -18,7 +18,7 @@ marp: true
The [emqx_resource](https://github.com/emqx/emqx/tree/master/apps/emqx_resource) is a behavior that manages configuration specs and runtime states for resources like mysql or redis backends. The [emqx_resource](https://github.com/emqx/emqx/tree/master/apps/emqx_resource) is a behavior that manages configuration specs and runtime states for resources like mysql or redis backends.
It is intended to be used by the emqx_data_bridges and all other resources that need CRUD operations to their configs, and need to initialize the states when creating. It is intended to be used by the emqx_bridges and all other resources that need CRUD operations to their configs, and need to initialize the states when creating.
--- ---
@ -55,10 +55,10 @@ on_health_check/2
``` ```
--- ---
## Start the emqx_data_bridge ## Start the emqx_bridge
``` ```
application:ensure_all_started(emqx_data_bridge). application:ensure_all_started(emqx_bridge).
``` ```
--- ---

View File

@ -269,7 +269,7 @@ relx_apps(ReleaseType) ->
, emqx_auto_subscribe , emqx_auto_subscribe
, emqx_gateway , emqx_gateway
, emqx_exhook , emqx_exhook
, emqx_data_bridge , emqx_bridge
, emqx_rule_engine , emqx_rule_engine
, emqx_rule_actions , emqx_rule_actions
, emqx_bridge_mqtt , emqx_bridge_mqtt