feat(emqx_resource): read and save configs from and to file

This commit is contained in:
Shawn 2021-06-02 01:58:26 +08:00
parent e7ffa07a1a
commit 5d52ce044d
12 changed files with 55 additions and 24 deletions

View File

@ -0,0 +1,4 @@
[{connectors,[#{<<"id">> => <<"mysql-abc">>,
<<"type">> => <<"mysql_connenctor">>},
#{<<"id">> => <<"pgsql-123">>,
<<"type">> => <<"pgsql_connenctor">>}]}].

View File

@ -2,18 +2,18 @@
## EMQ X CONNECTOR Plugin ## EMQ X CONNECTOR Plugin
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Base directory for emqx_connector indicating where to load configs from disk. connectors: [
## {id: "mysql-abc"
## Value: String resource_type: emqx_connector_mysql
## Default: "{{ etc_dir }}/connectors/" config: {
emqx_connectors: [ server: "127.0.0.1:3306"
{id: "mysql-abc", database: mqtt
type: mysql_connenctor, pool_size: 1
config: {} user: root
}, password: public
{id: "pgsql-123", auto_reconnect: true
type: pgsql_connenctor, ssl: false
config: {} }
}, }
] ]

View File

@ -1,2 +1,7 @@
%%-*- mode: erlang -*- %%-*- mode: erlang -*-
%% emqx_connector config mapping %% emqx_connector config mapping
{mapping, "connectors", "connectors", [
{default, []},
{datatype, string}
]}.

View File

@ -22,7 +22,7 @@ stop(_State) ->
load_config() -> load_config() ->
case hocon:load("etc/plugins/emqx_connector.conf", #{format => map}) of case hocon:load("etc/plugins/emqx_connector.conf", #{format => map}) of
{ok, #{<<"emqx_connectors">> := Connectors}} -> {ok, #{<<"connectors">> := Connectors}} ->
lists:foreach(fun load_connector/1, Connectors); lists:foreach(fun load_connector/1, Connectors);
{error, Reason} -> {error, Reason} ->
error(Reason) error(Reason)

View File

@ -5,7 +5,9 @@
-emqx_resource_api_path("connectors/mysql"). -emqx_resource_api_path("connectors/mysql").
-export([fields/1]). -export([ fields/1
, on_config_to_file/1
]).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([ on_start/2
@ -18,10 +20,14 @@
-export([do_health_check/1]). -export([do_health_check/1]).
%%=====================================================================
fields("config") -> fields("config") ->
emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields(). emqx_connector_schema_lib:ssl_fields().
on_config_to_file(#{server := Server} = Config) ->
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
%% =================================================================== %% ===================================================================
on_start(InstId, #{server := {Host, Port}, on_start(InstId, #{server := {Host, Port},
@ -58,7 +64,7 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]),
emqx_resource:query_failure(AfterQuery); emqx_resource:query_failed(AfterQuery);
_ -> _ ->
emqx_resource:query_success(AfterQuery) emqx_resource:query_success(AfterQuery)
end, end,

View File

@ -6,6 +6,7 @@
]). ]).
-export([ to_ip_port/1 -export([ to_ip_port/1
, ip_port_to_string/1
]). ]).
-typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}). -typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}).
@ -100,3 +101,6 @@ to_ip_port(Str) ->
end; end;
_ -> {error, Str} _ -> {error, Str}
end. end.
ip_port_to_string({Ip, Port}) ->
inet:ntoa(Ip) ++ ":" ++ integer_to_list(Port).

View File

@ -15,7 +15,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-type resource_type() :: module(). -type resource_type() :: module().
-type instance_id() :: binary(). -type instance_id() :: binary().
-type resource_config() :: jsx:json_term(). -type resource_config() :: term().
-type resource_spec() :: map(). -type resource_spec() :: map().
-type resource_state() :: term(). -type resource_state() :: term().
-type resource_data() :: #{ -type resource_data() :: #{

View File

@ -7,7 +7,8 @@
[kernel, [kernel,
stdlib, stdlib,
gproc, gproc,
hocon hocon,
jsx
]}, ]},
{env,[]}, {env,[]},
{modules, []}, {modules, []},

View File

@ -65,6 +65,7 @@
, call_health_check/3 %% verify if the resource is working normally , call_health_check/3 %% verify if the resource is working normally
, call_stop/3 %% stop the instance , call_stop/3 %% stop the instance
, call_config_merge/4 %% merge the config when updating , call_config_merge/4 %% merge the config when updating
, call_config_to_file/2
]). ]).
-export([ list_instances/0 %% list all the instances, id only. -export([ list_instances/0 %% list all the instances, id only.
@ -85,12 +86,15 @@
, on_health_check/2 , on_health_check/2
, on_api_reply_format/1 , on_api_reply_format/1
, on_config_merge/3 , on_config_merge/3
, on_config_to_file/1
]). ]).
-callback on_api_reply_format(resource_data()) -> map(). -callback on_api_reply_format(resource_data()) -> map().
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). -callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
-callback on_config_to_file(resource_config()) -> jsx:json_term().
%% when calling emqx_resource:start/1 %% when calling emqx_resource:start/1
-callback on_start(instance_id(), resource_config()) -> -callback on_start(instance_id(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
@ -241,6 +245,10 @@ call_stop(InstId, Mod, ResourceState) ->
call_config_merge(Mod, OldConfig, NewConfig, Params) -> call_config_merge(Mod, OldConfig, NewConfig, Params) ->
?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)). ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)).
-spec call_config_to_file(module(), resource_config()) -> jsx:json_term().
call_config_to_file(Mod, Config) ->
?SAFE_CALL(Mod:on_config_to_file(Config)).
-spec parse_config(resource_type(), binary() | term()) -> -spec parse_config(resource_type(), binary() | term()) ->
{ok, resource_config()} | {error, term()}. {ok, resource_config()} | {error, term()}.
parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> parse_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
@ -271,7 +279,7 @@ resource_type_from_str(ResourceType) ->
false -> {error, {invalid_resource, Mod}} false -> {error, {invalid_resource, Mod}}
end end
catch error:badarg -> catch error:badarg ->
{error, {resourec_not_found, ResourceType}} {error, {resource_not_found, ResourceType}}
end. end.
call_instance(InstId, Query) -> call_instance(InstId, Query) ->

View File

@ -34,7 +34,7 @@ get(Mod, #{id := Id}, _Params) ->
put(Mod, #{id := Id}, Params) -> put(Mod, #{id := Id}, Params) ->
ConfigParams = proplists:get_value(<<"config">>, Params), ConfigParams = proplists:get_value(<<"config">>, Params),
ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params), ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}),
case emqx_resource:resource_type_from_str(ResourceTypeStr) of case emqx_resource:resource_type_from_str(ResourceTypeStr) of
{ok, ResourceType} -> {ok, ResourceType} ->
do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params); do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params);

View File

@ -110,8 +110,8 @@ load_config(RawConfig) when is_binary(RawConfig) ->
Error -> Error Error -> Error
end; end;
load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr, load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) ->
<<"config">> := MapConfig}) -> MapConfig = maps:get(<<"config">>, Config, #{}),
case emqx_resource:resource_type_from_str(ResourceTypeStr) of case emqx_resource:resource_type_from_str(ResourceTypeStr) of
{ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig); {ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig);
Error -> Error Error -> Error
@ -130,8 +130,11 @@ create_local(InstId, ResourceType, InstConf) ->
end. end.
save_config_to_disk(InstId, ResourceType, Config) -> save_config_to_disk(InstId, ResourceType, Config) ->
%% TODO: send an event to the config handler, and the hander (single process)
%% will dump configs for all instances (from an ETS table) to a file.
file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]), file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]),
jsx:encode(#{id => InstId, resource_type => ResourceType, config => Config})). jsx:encode(#{id => InstId, resource_type => ResourceType,
config => emqx_resource:call_config_to_file(Config)})).
emqx_data_dir() -> emqx_data_dir() ->
"data". "data".