refactor(rules): remove emqx_rule_actions

This commit is contained in:
Shawn 2021-09-24 19:41:02 +08:00
parent af295a9b71
commit bd081913b5
13 changed files with 1 additions and 1633 deletions

View File

@ -1,66 +0,0 @@
-module(emqx_rule_actions_trans).
-include_lib("syntax_tools/include/merl.hrl").
-export([parse_transform/2]).
parse_transform(Forms, _Options) ->
trans(Forms, []).
trans([], ResAST) ->
lists:reverse(ResAST);
trans([{eof, L} | AST], ResAST) ->
lists:reverse([{eof, L} | ResAST]) ++ AST;
trans([{function, LineNo, FuncName, Arity, Clauses} | AST], ResAST) ->
NewClauses = trans_func_clauses(atom_to_list(FuncName), Clauses),
trans(AST, [{function, LineNo, FuncName, Arity, NewClauses} | ResAST]);
trans([Form | AST], ResAST) ->
trans(AST, [Form | ResAST]).
trans_func_clauses("on_action_create_" ++ _ = _FuncName , Clauses) ->
NewClauses = [
begin
Bindings = lists:flatten(get_vars(Args) ++ get_vars(Body, lefth)),
Body2 = append_to_result(Bindings, Body),
{clause, LineNo, Args, Guards, Body2}
end || {clause, LineNo, Args, Guards, Body} <- Clauses],
NewClauses;
trans_func_clauses(_FuncName, Clauses) ->
Clauses.
get_vars(Exprs) ->
get_vars(Exprs, all).
get_vars(Exprs, Type) ->
do_get_vars(Exprs, [], Type).
do_get_vars([], Vars, _Type) -> Vars;
do_get_vars([Line | Expr], Vars, all) ->
do_get_vars(Expr, [syntax_vars(erl_syntax:form_list([Line])) | Vars], all);
do_get_vars([Line | Expr], Vars, lefth) ->
do_get_vars(Expr,
case (Line) of
?Q("_@LeftV = _@@_") -> Vars ++ syntax_vars(LeftV);
_ -> Vars
end, lefth).
syntax_vars(Line) ->
sets:to_list(erl_syntax_lib:variables(Line)).
%% append bindings to the return value as the first tuple element.
%% e.g. if the original result is R, then the new result will be {[binding()], R}.
append_to_result(Bindings, Exprs) ->
erl_syntax:revert_forms(do_append_to_result(to_keyword(Bindings), Exprs, [])).
do_append_to_result(KeyWordVars, [Line], Res) ->
case Line of
?Q("_@LeftV = _@RightV") ->
lists:reverse([?Q("{[_@KeyWordVars], _@LeftV}"), Line | Res]);
_ ->
lists:reverse([?Q("{[_@KeyWordVars], _@Line}") | Res])
end;
do_append_to_result(KeyWordVars, [Line | Exprs], Res) ->
do_append_to_result(KeyWordVars, Exprs, [Line | Res]).
to_keyword(Vars) ->
[erl_syntax:tuple([erl_syntax:atom(Var), merl:var(Var)])
|| Var <- Vars].

View File

@ -146,7 +146,6 @@ reboot_apps() ->
, emqx_management
, emqx_retainer
, emqx_exhook
, emqx_rule_actions
, emqx_authn
, emqx_authz
].

View File

@ -1,11 +0,0 @@
# emqx_rule_actions
This project contains a collection of rule actions/resources. It is mainly for
making unit test easier. Also it's easier for us to create utils that many
modules depends on it.
## Build
-----
$ rebar3 compile

View File

@ -1,25 +0,0 @@
{deps, []}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
no_debug_info,
compressed, %% for edge
{parse_transform}
]}.
{overrides, [{add, [{erl_opts, [no_debug_info, compressed]}]}]}.
{edoc_opts, [{preprocess, true}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions
]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{plugins, [rebar3_proper]}.

View File

@ -1,576 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
-module(emqx_bridge_mqtt_actions).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-import(emqx_plugin_libs_rule, [str/1]).
-export([ on_resource_create/2
, on_get_resource_status/2
, on_resource_destroy/2
]).
%% Callbacks of ecpool Worker
-export([connect/1]).
-export([subscriptions/1]).
-export([ on_action_create_data_to_mqtt_broker/2
, on_action_data_to_mqtt_broker/2
]).
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
-define(RESOURCE_TYPE_RPC, 'bridge_rpc').
-define(RESOURCE_CONFIG_SPEC_MQTT, #{
address => #{
order => 1,
type => string,
required => true,
default => <<"127.0.0.1:1883">>,
title => #{en => <<" Broker Address">>,
zh => <<"远程 broker 地址"/utf8>>},
description => #{en => <<"The MQTT Remote Address">>,
zh => <<"远程 MQTT Broker 的地址"/utf8>>}
},
pool_size => #{
order => 2,
type => number,
required => true,
default => 8,
title => #{en => <<"Pool Size">>,
zh => <<"连接池大小"/utf8>>},
description => #{en => <<"MQTT Connection Pool Size">>,
zh => <<"连接池大小"/utf8>>}
},
clientid => #{
order => 3,
type => string,
required => true,
default => <<"client">>,
title => #{en => <<"ClientId">>,
zh => <<"客户端 Id"/utf8>>},
description => #{en => <<"ClientId for connecting to remote MQTT broker">>,
zh => <<"连接远程 Broker 的 ClientId"/utf8>>}
},
append => #{
order => 4,
type => boolean,
required => false,
default => true,
title => #{en => <<"Append GUID">>,
zh => <<"附加 GUID"/utf8>>},
description => #{en => <<"Append GUID to MQTT ClientId?">>,
zh => <<"是否将GUID附加到 MQTT ClientId 后"/utf8>>}
},
username => #{
order => 5,
type => string,
required => false,
default => <<"">>,
title => #{en => <<"Username">>, zh => <<"用户名"/utf8>>},
description => #{en => <<"Username for connecting to remote MQTT Broker">>,
zh => <<"连接远程 Broker 的用户名"/utf8>>}
},
password => #{
order => 6,
type => password,
required => false,
default => <<"">>,
title => #{en => <<"Password">>,
zh => <<"密码"/utf8>>},
description => #{en => <<"Password for connecting to remote MQTT Broker">>,
zh => <<"连接远程 Broker 的密码"/utf8>>}
},
mountpoint => #{
order => 7,
type => string,
required => false,
default => <<"bridge/aws/${node}/">>,
title => #{en => <<"Bridge MountPoint">>,
zh => <<"桥接挂载点"/utf8>>},
description => #{
en => <<"MountPoint for bridge topic:<br/>"
"Example: The topic of messages sent to `topic1` on local node "
"will be transformed to `bridge/aws/${node}/topic1`">>,
zh => <<"桥接主题的挂载点:<br/>"
"示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题"
"会变换为 `bridge/aws/${node}/topic1`"/utf8>>
}
},
disk_cache => #{
order => 8,
type => boolean,
required => false,
default => false,
title => #{en => <<"Disk Cache">>,
zh => <<"磁盘缓存"/utf8>>},
description => #{en => <<"The flag which determines whether messages "
"can be cached on local disk when bridge is "
"disconnected">>,
zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁"
"盘队列上"/utf8>>}
},
proto_ver => #{
order => 9,
type => string,
required => false,
default => <<"mqttv4">>,
enum => [<<"mqttv3">>, <<"mqttv4">>, <<"mqttv5">>],
title => #{en => <<"Protocol Version">>,
zh => <<"协议版本"/utf8>>},
description => #{en => <<"MQTTT Protocol version">>,
zh => <<"MQTT 协议版本"/utf8>>}
},
keepalive => #{
order => 10,
type => string,
required => false,
default => <<"60s">> ,
title => #{en => <<"Keepalive">>,
zh => <<"心跳间隔"/utf8>>},
description => #{en => <<"Keepalive">>,
zh => <<"心跳间隔"/utf8>>}
},
reconnect_interval => #{
order => 11,
type => string,
required => false,
default => <<"30s">>,
title => #{en => <<"Reconnect Interval">>,
zh => <<"重连间隔"/utf8>>},
description => #{en => <<"Reconnect interval of bridge:<br/>">>,
zh => <<"重连间隔"/utf8>>}
},
retry_interval => #{
order => 12,
type => string,
required => false,
default => <<"20s">>,
title => #{en => <<"Retry interval">>,
zh => <<"重传间隔"/utf8>>},
description => #{en => <<"Retry interval for bridge QoS1 message delivering">>,
zh => <<"消息重传间隔"/utf8>>}
},
bridge_mode => #{
order => 13,
type => boolean,
required => false,
default => false,
title => #{en => <<"Bridge Mode">>,
zh => <<"桥接模式"/utf8>>},
description => #{en => <<"Bridge mode for MQTT bridge connection">>,
zh => <<"MQTT 连接是否为桥接模式"/utf8>>}
},
ssl => #{
order => 14,
type => boolean,
default => false,
title => #{en => <<"Enable SSL">>,
zh => <<"开启SSL链接"/utf8>>},
description => #{en => <<"Enable SSL or not">>,
zh => <<"是否开启 SSL"/utf8>>}
},
cacertfile => #{
order => 15,
type => file,
required => false,
default => <<"etc/certs/cacert.pem">>,
title => #{en => <<"CA certificates">>,
zh => <<"CA 证书"/utf8>>},
description => #{en => <<"The file path of the CA certificates">>,
zh => <<"CA 证书路径"/utf8>>}
},
certfile => #{
order => 16,
type => file,
required => false,
default => <<"etc/certs/client-cert.pem">>,
title => #{en => <<"SSL Certfile">>,
zh => <<"SSL 客户端证书"/utf8>>},
description => #{en => <<"The file path of the client certfile">>,
zh => <<"客户端证书路径"/utf8>>}
},
keyfile => #{
order => 17,
type => file,
required => false,
default => <<"etc/certs/client-key.pem">>,
title => #{en => <<"SSL Keyfile">>,
zh => <<"SSL 密钥文件"/utf8>>},
description => #{en => <<"The file path of the client keyfile">>,
zh => <<"客户端密钥路径"/utf8>>}
},
ciphers => #{
order => 18,
type => string,
required => false,
default => <<"ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,",
"ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,",
"ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,",
"ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,",
"AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,",
"ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,",
"ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,",
"DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,",
"ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,",
"ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,",
"DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA">>,
title => #{en => <<"SSL Ciphers">>,
zh => <<"SSL 加密算法"/utf8>>},
description => #{en => <<"SSL Ciphers">>,
zh => <<"SSL 加密算法"/utf8>>}
}
}).
-define(RESOURCE_CONFIG_SPEC_RPC, #{
address => #{
order => 1,
type => string,
required => true,
default => <<"emqx2@127.0.0.1">>,
title => #{en => <<"EMQ X Node Name">>,
zh => <<"EMQ X 节点名称"/utf8>>},
description => #{en => <<"EMQ X Remote Node Name">>,
zh => <<"远程 EMQ X 节点名称 "/utf8>>}
},
mountpoint => #{
order => 2,
type => string,
required => false,
default => <<"bridge/emqx/${node}/">>,
title => #{en => <<"Bridge MountPoint">>,
zh => <<"桥接挂载点"/utf8>>},
description => #{en => <<"MountPoint for bridge topic<br/>"
"Example: The topic of messages sent to `topic1` on local node "
"will be transformed to `bridge/aws/${node}/topic1`">>,
zh => <<"桥接主题的挂载点<br/>"
"示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题"
"会变换为 `bridge/aws/${node}/topic1`"/utf8>>}
},
pool_size => #{
order => 3,
type => number,
required => true,
default => 8,
title => #{en => <<"Pool Size">>,
zh => <<"连接池大小"/utf8>>},
description => #{en => <<"MQTT/RPC Connection Pool Size">>,
zh => <<"连接池大小"/utf8>>}
},
reconnect_interval => #{
order => 4,
type => string,
required => false,
default => <<"30s">>,
title => #{en => <<"Reconnect Interval">>,
zh => <<"重连间隔"/utf8>>},
description => #{en => <<"Reconnect Interval of bridge">>,
zh => <<"重连间隔"/utf8>>}
},
batch_size => #{
order => 5,
type => number,
required => false,
default => 32,
title => #{en => <<"Batch Size">>,
zh => <<"批处理大小"/utf8>>},
description => #{en => <<"Batch Size">>,
zh => <<"批处理大小"/utf8>>}
},
disk_cache => #{
order => 6,
type => boolean,
required => false,
default => false,
title => #{en => <<"Disk Cache">>,
zh => <<"磁盘缓存"/utf8>>},
description => #{en => <<"The flag which determines whether messages "
"can be cached on local disk when bridge is "
"disconnected">>,
zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁"
"盘队列上"/utf8>>}
}
}).
-define(ACTION_PARAM_RESOURCE, #{
type => string,
required => true,
title => #{en => <<"Resource ID">>, zh => <<"资源 ID"/utf8>>},
description => #{en => <<"Bind a resource to this action">>,
zh => <<"给动作绑定一个资源"/utf8>>}
}).
-resource_type(#{
name => ?RESOURCE_TYPE_MQTT,
create => on_resource_create,
status => on_get_resource_status,
destroy => on_resource_destroy,
params => ?RESOURCE_CONFIG_SPEC_MQTT,
title => #{en => <<"MQTT Bridge">>, zh => <<"MQTT Bridge"/utf8>>},
description => #{en => <<"MQTT Message Bridge">>, zh => <<"MQTT 消息桥接"/utf8>>}
}).
-resource_type(#{
name => ?RESOURCE_TYPE_RPC,
create => on_resource_create,
status => on_get_resource_status,
destroy => on_resource_destroy,
params => ?RESOURCE_CONFIG_SPEC_RPC,
title => #{en => <<"EMQX Bridge">>, zh => <<"EMQX Bridge"/utf8>>},
description => #{en => <<"EMQ X RPC Bridge">>, zh => <<"EMQ X RPC 消息桥接"/utf8>>}
}).
-rule_action(#{
name => data_to_mqtt_broker,
category => data_forward,
for => 'message.publish',
types => [?RESOURCE_TYPE_MQTT, ?RESOURCE_TYPE_RPC],
create => on_action_create_data_to_mqtt_broker,
params => #{'$resource' => ?ACTION_PARAM_RESOURCE,
forward_topic => #{
order => 1,
type => string,
required => false,
default => <<"">>,
title => #{en => <<"Forward Topic">>,
zh => <<"转发消息主题"/utf8>>},
description => #{en => <<"The topic used when forwarding the message. "
"Defaults to the topic of the bridge message if not provided.">>,
zh => <<"转发消息时使用的主题。如果未提供,则默认为桥接消息的主题。"/utf8>>}
},
payload_tmpl => #{
order => 2,
type => string,
input => textarea,
required => false,
default => <<"">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported. "
"If using empty template (default), then the payload will be "
"all the available vars in JSON format">>,
zh => <<"消息内容模板,支持变量。"
"若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>}
}
},
title => #{en => <<"Data bridge to MQTT Broker">>,
zh => <<"桥接数据到 MQTT Broker"/utf8>>},
description => #{en => <<"Bridge Data to MQTT Broker">>,
zh => <<"桥接数据到 MQTT Broker"/utf8>>}
}).
on_resource_create(ResId, Params) ->
?LOG(info, "Initiating Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]),
{ok, _} = application:ensure_all_started(ecpool),
PoolName = pool_name(ResId),
Options = options(Params, PoolName, ResId),
start_resource(ResId, PoolName, Options),
case test_resource_status(PoolName) of
true -> ok;
false ->
on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
error({{?RESOURCE_TYPE_MQTT, ResId}, connection_failed})
end,
#{<<"pool">> => PoolName}.
start_resource(ResId, PoolName, Options) ->
case ecpool:start_sup_pool(PoolName, ?MODULE, Options) of
{ok, _} ->
?LOG(info, "Initiated Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]);
{error, {already_started, _Pid}} ->
on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
start_resource(ResId, PoolName, Options);
{error, Reason} ->
?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
error({{?RESOURCE_TYPE_MQTT, ResId}, create_failed})
end.
test_resource_status(PoolName) ->
IsConnected = fun(Worker) ->
case ecpool_worker:client(Worker) of
{ok, Bridge} ->
try emqx_connector_mqtt_worker:status(Bridge) of
connected -> true;
_ -> false
catch _Error:_Reason ->
false
end;
{error, _} ->
false
end
end,
Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
lists:any(fun(St) -> St =:= true end, Status).
-spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()).
on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) ->
IsAlive = test_resource_status(PoolName),
#{is_alive => IsAlive}.
on_resource_destroy(ResId, #{<<"pool">> := PoolName}) ->
?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]),
case ecpool:stop_sup_pool(PoolName) of
ok ->
?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]);
{error, Reason} ->
?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed})
end.
on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
<<"forward_topic">> := ForwardTopic,
<<"payload_tmpl">> := PayloadTmpl}) ->
?LOG(info, "Initiating Action ~p.", [?FUNCTION_NAME]),
PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl),
TopicTks = case ForwardTopic == <<"">> of
true -> undefined;
false -> emqx_plugin_libs_rule:preproc_tmpl(ForwardTopic)
end,
Opts.
on_action_data_to_mqtt_broker(Msg, _Env =
#{id := Id, clientid := From, flags := Flags,
topic := Topic, timestamp := TimeStamp, qos := QoS,
?BINDING_KEYS := #{
'ActId' := ActId,
'PoolName' := PoolName,
'TopicTks' := TopicTks,
'PayloadTks' := PayloadTks
}}) ->
Topic1 = case TopicTks =:= undefined of
true -> Topic;
false -> emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)
end,
BrokerMsg = #message{id = Id,
qos = QoS,
from = From,
flags = Flags,
topic = Topic1,
payload = format_data(PayloadTks, Msg),
timestamp = TimeStamp},
ecpool:with_client(PoolName,
fun(BridgePid) ->
BridgePid ! {deliver, rule_engine, BrokerMsg}
end),
emqx_rule_metrics:inc_actions_success(ActId).
format_data([], Msg) ->
emqx_json:encode(Msg);
format_data(Tokens, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
subscriptions(Subscriptions) ->
scan_binary(<<"[", Subscriptions/binary, "].">>).
is_node_addr(Addr0) ->
Addr = binary_to_list(Addr0),
case string:tokens(Addr, "@") of
[_NodeName, _Hostname] -> true;
_ -> false
end.
scan_binary(Bin) ->
TermString = binary_to_list(Bin),
scan_string(TermString).
scan_string(TermString) ->
{ok, Tokens, _} = erl_scan:string(TermString),
{ok, Term} = erl_parse:parse_term(Tokens),
Term.
connect(Options) when is_list(Options) ->
connect(maps:from_list(Options));
connect(Options = #{disk_cache := DiskCache, ecpool_worker_id := Id, pool_name := Pool}) ->
Options0 = case DiskCache of
true ->
DataDir = filename:join([emqx:get_config([node, data_dir]), replayq, Pool, integer_to_list(Id)]),
QueueOption = #{replayq_dir => DataDir},
Options#{queue => QueueOption};
false ->
Options
end,
Options1 = case maps:is_key(append, Options0) of
false -> Options0;
true ->
case maps:get(append, Options0, false) of
true ->
ClientId = lists:concat([str(maps:get(clientid, Options0)), "_", str(emqx_guid:to_hexstr(emqx_guid:gen()))]),
Options0#{clientid => ClientId};
false ->
Options0
end
end,
Options2 = maps:without([ecpool_worker_id, pool_name, append], Options1),
emqx_connector_mqtt_worker:start_link(Options2#{name => name(Pool, Id)}).
name(Pool, Id) ->
list_to_atom(atom_to_list(Pool) ++ ":" ++ integer_to_list(Id)).
pool_name(ResId) ->
list_to_atom("bridge_mqtt:" ++ str(ResId)).
options(Options, PoolName, ResId) ->
GetD = fun(Key, Default) -> maps:get(Key, Options, Default) end,
Get = fun(Key) -> GetD(Key, undefined) end,
Address = Get(<<"address">>),
[{max_inflight_batches, 32},
{forward_mountpoint, str(Get(<<"mountpoint">>))},
{disk_cache, GetD(<<"disk_cache">>, false)},
{start_type, auto},
{reconnect_delay_ms, hocon_postprocess:duration(str(Get(<<"reconnect_interval">>)))},
{if_record_metrics, false},
{pool_size, GetD(<<"pool_size">>, 1)},
{pool_name, PoolName}
] ++ case is_node_addr(Address) of
true ->
[{address, binary_to_atom(Get(<<"address">>), utf8)},
{connect_module, emqx_bridge_rpc},
{batch_size, Get(<<"batch_size">>)}];
false ->
[{address, binary_to_list(Address)},
{bridge_mode, GetD(<<"bridge_mode">>, true)},
{clean_start, true},
{clientid, str(Get(<<"clientid">>))},
{append, Get(<<"append">>)},
{connect_module, emqx_bridge_mqtt},
{keepalive, hocon_postprocess:duration(str(Get(<<"keepalive">>))) div 1000},
{username, str(Get(<<"username">>))},
{password, str(Get(<<"password">>))},
{proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
{retry_interval, hocon_postprocess:duration(str(GetD(<<"retry_interval">>, "30s"))) div 1000}
| maybe_ssl(Options, Get(<<"ssl">>), ResId)]
end.
maybe_ssl(_Options, false, _ResId) ->
[];
maybe_ssl(Options, true, ResId) ->
[{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Options, "rules", ResId)}].
mqtt_ver(ProtoVer) ->
case ProtoVer of
<<"mqttv3">> -> v3;
<<"mqttv4">> -> v4;
<<"mqttv5">> -> v5;
_ -> v4
end.

View File

@ -1,12 +0,0 @@
%% -*- mode: erlang -*-
{application, emqx_rule_actions,
[{description, "Rule actions"},
{vsn, "5.0.0"},
{registered, []},
{applications,
[kernel,stdlib,emqx]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -1,379 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Define the default actions.
-module(emqx_web_hook_actions).
-export([ on_resource_create/2
, on_get_resource_status/2
, on_resource_destroy/2
]).
-export([ on_action_create_data_to_webserver/2
, on_action_data_to_webserver/2
]).
-export_type([action_fun/0]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())).
-type(url() :: binary()).
-define(RESOURCE_TYPE_WEBHOOK, 'web_hook').
-define(RESOURCE_CONFIG_SPEC, #{
url => #{order => 1,
type => string,
format => url,
required => true,
title => #{en => <<"Request URL">>,
zh => <<"请求 URL"/utf8>>},
description => #{en => <<"The URL of the server that will receive the Webhook requests.">>,
zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}},
connect_timeout => #{order => 2,
type => string,
default => <<"5s">>,
title => #{en => <<"Connect Timeout">>,
zh => <<"连接超时时间"/utf8>>},
description => #{en => <<"Connect Timeout In Seconds">>,
zh => <<"连接超时时间"/utf8>>}},
request_timeout => #{order => 3,
type => string,
default => <<"5s">>,
title => #{en => <<"Request Timeout">>,
zh => <<"请求超时时间时间"/utf8>>},
description => #{en => <<"Request Timeout In Seconds">>,
zh => <<"请求超时时间"/utf8>>}},
pool_size => #{order => 4,
type => number,
default => 8,
title => #{en => <<"Pool Size">>, zh => <<"连接池大小"/utf8>>},
description => #{en => <<"Connection Pool">>,
zh => <<"连接池大小"/utf8>>}
},
cacertfile => #{order => 5,
type => file,
default => <<"">>,
title => #{en => <<"CA Certificate File">>,
zh => <<"CA 证书文件"/utf8>>},
description => #{en => <<"CA Certificate file">>,
zh => <<"CA 证书文件"/utf8>>}},
keyfile => #{order => 6,
type => file,
default => <<"">>,
title =>#{en => <<"SSL Key">>,
zh => <<"SSL Key"/utf8>>},
description => #{en => <<"Your ssl keyfile">>,
zh => <<"SSL 私钥"/utf8>>}},
certfile => #{order => 7,
type => file,
default => <<"">>,
title => #{en => <<"SSL Cert">>,
zh => <<"SSL Cert"/utf8>>},
description => #{en => <<"Your ssl certfile">>,
zh => <<"SSL 证书"/utf8>>}},
verify => #{order => 8,
type => boolean,
default => false,
title => #{en => <<"Verify Server Certfile">>,
zh => <<"校验服务器证书"/utf8>>},
description => #{en => <<"Whether to verify the server certificate. By default, the client will not verify the server's certificate. If verification is required, please set it to true.">>,
zh => <<"是否校验服务器证书。 默认客户端不会去校验服务器的证书如果需要校验请设置成true。"/utf8>>}},
server_name_indication => #{order => 9,
type => string,
title => #{en => <<"Server Name Indication">>,
zh => <<"服务器名称指示"/utf8>>},
description => #{en => <<"Specify the hostname used for peer certificate verification, or set to disable to turn off this verification.">>,
zh => <<"指定用于对端证书验证时使用的主机名,或者设置为 disable 以关闭此项验证。"/utf8>>}}
}).
-define(ACTION_PARAM_RESOURCE, #{
order => 0,
type => string,
required => true,
title => #{en => <<"Resource ID">>,
zh => <<"资源 ID"/utf8>>},
description => #{en => <<"Bind a resource to this action">>,
zh => <<"给动作绑定一个资源"/utf8>>}
}).
-define(ACTION_DATA_SPEC, #{
'$resource' => ?ACTION_PARAM_RESOURCE,
method => #{
order => 1,
type => string,
enum => [<<"POST">>, <<"DELETE">>, <<"PUT">>, <<"GET">>],
default => <<"POST">>,
title => #{en => <<"Method">>,
zh => <<"Method"/utf8>>},
description => #{en => <<"HTTP Method.\n"
"Note that: the Body option in the Action will be discarded in case of GET or DELETE method.">>,
zh => <<"HTTP Method。\n"
"注意:当方法为 GET 或 DELETE 时,动作中的 Body 选项会被忽略。"/utf8>>}},
path => #{
order => 2,
type => string,
required => false,
default => <<"">>,
title => #{en => <<"Path">>,
zh => <<"Path"/utf8>>},
description => #{en => <<"The path part of the URL, support using ${Var} to get the field value output by the rule.">>,
zh => <<"URL 的路径部分,支持使用 ${Var} 获取规则输出的字段值。\n"/utf8>>}
},
headers => #{
order => 3,
type => object,
schema => #{},
default => #{<<"content-type">> => <<"application/json">>},
title => #{en => <<"Headers">>,
zh => <<"Headers"/utf8>>},
description => #{en => <<"HTTP headers.">>,
zh => <<"HTTP headers。"/utf8>>}},
body => #{
order => 4,
type => string,
input => textarea,
required => false,
default => <<"">>,
title => #{en => <<"Body">>,
zh => <<"Body"/utf8>>},
description => #{en => <<"The HTTP body supports the use of ${Var} to obtain the field value output by the rule.\n"
"The content of the default HTTP request body is a JSON string composed of the keys and values of all fields output by the rule.">>,
zh => <<"HTTP 请求体,支持使用 ${Var} 获取规则输出的字段值\n"
"默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}}
}).
-resource_type(
#{name => ?RESOURCE_TYPE_WEBHOOK,
create => on_resource_create,
status => on_get_resource_status,
destroy => on_resource_destroy,
params => ?RESOURCE_CONFIG_SPEC,
title => #{en => <<"WebHook">>,
zh => <<"WebHook"/utf8>>},
description => #{en => <<"WebHook">>,
zh => <<"WebHook"/utf8>>}
}).
-rule_action(#{name => data_to_webserver,
category => data_forward,
for => '$any',
create => on_action_create_data_to_webserver,
params => ?ACTION_DATA_SPEC,
types => [?RESOURCE_TYPE_WEBHOOK],
title => #{en => <<"Data to Web Server">>,
zh => <<"发送数据到 Web 服务"/utf8>>},
description => #{en => <<"Forward Messages to Web Server">>,
zh => <<"将数据转发给 Web 服务"/utf8>>}
}).
%%------------------------------------------------------------------------------
%% Actions for web hook
%%------------------------------------------------------------------------------
-spec(on_resource_create(binary(), map()) -> map()).
on_resource_create(ResId, Conf) ->
{ok, _} = application:ensure_all_started(ehttpc),
Options = pool_opts(Conf, ResId),
PoolName = pool_name(ResId),
case test_http_connect(Conf) of
true -> ok;
false -> error({error, check_http_connectivity_failed})
end,
start_resource(ResId, PoolName, Options),
Conf#{<<"pool">> => PoolName, options => Options}.
start_resource(ResId, PoolName, Options) ->
case ehttpc_pool:start_pool(PoolName, Options) of
{ok, _} ->
?LOG(info, "Initiated Resource ~p Successfully, ResId: ~p",
[?RESOURCE_TYPE_WEBHOOK, ResId]);
{error, {already_started, _Pid}} ->
on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
start_resource(ResId, PoolName, Options);
{error, Reason} ->
?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~0p",
[?RESOURCE_TYPE_WEBHOOK, ResId, Reason]),
error({{?RESOURCE_TYPE_WEBHOOK, ResId}, create_failed})
end.
-spec(on_get_resource_status(binary(), map()) -> map()).
on_get_resource_status(_ResId, Conf) ->
#{is_alive => test_http_connect(Conf)}.
-spec(on_resource_destroy(binary(), map()) -> ok | {error, Reason::term()}).
on_resource_destroy(ResId, #{<<"pool">> := PoolName}) ->
?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]),
case ehttpc_pool:stop_pool(PoolName) of
ok ->
?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]);
{error, Reason} ->
?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_WEBHOOK, ResId, Reason]),
error({{?RESOURCE_TYPE_WEBHOOK, ResId}, destroy_failed})
end.
%% An action that forwards publish messages to a remote web server.
-spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> {bindings(), NewParams :: map()}).
on_action_create_data_to_webserver(Id, Params) ->
#{method := Method,
path := Path,
headers := Headers,
body := Body,
pool := Pool,
request_timeout := RequestTimeout} = parse_action_params(Params),
BodyTokens = emqx_plugin_libs_rule:preproc_tmpl(Body),
PathTokens = emqx_plugin_libs_rule:preproc_tmpl(Path),
Params.
on_action_data_to_webserver(Selected, _Envs =
#{?BINDING_KEYS := #{
'Id' := Id,
'Method' := Method,
'Headers' := Headers,
'PathTokens' := PathTokens,
'BodyTokens' := BodyTokens,
'Pool' := Pool,
'RequestTimeout' := RequestTimeout},
clientid := ClientID}) ->
NBody = format_msg(BodyTokens, Selected),
NPath = emqx_plugin_libs_rule:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody),
case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
emqx_rule_metrics:inc_actions_success(Id);
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
emqx_rule_metrics:inc_actions_success(Id);
{ok, StatusCode, _} ->
?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]),
emqx_rule_metrics:inc_actions_error(Id);
{ok, StatusCode, _, _} ->
?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]),
emqx_rule_metrics:inc_actions_error(Id);
{error, Reason} ->
?LOG(error, "[WebHook Action] HTTP request error: ~p", [Reason]),
emqx_rule_metrics:inc_actions_error(Id)
end.
format_msg([], Data) ->
emqx_json:encode(Data);
format_msg(Tokens, Data) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Data).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
create_req(Method, Path, Headers, _Body)
when Method =:= get orelse Method =:= delete ->
{Path, Headers};
create_req(_, Path, Headers, Body) ->
{Path, Headers, Body}.
parse_action_params(Params = #{<<"url">> := URL}) ->
try
{ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL),
Method = method(maps:get(<<"method">>, Params, <<"POST">>)),
Headers = headers(maps:get(<<"headers">>, Params, undefined)),
NHeaders = ensure_content_type_header(Headers, Method),
#{method => Method,
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
headers => NHeaders,
body => maps:get(<<"body">>, Params, <<>>),
request_timeout => hocon_postprocess:duration(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
pool => maps:get(<<"pool">>, Params)}
catch _:_ ->
throw({invalid_params, Params})
end.
ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =:= put ->
Headers;
ensure_content_type_header(Headers, _Method) ->
lists:keydelete("content-type", 1, Headers).
merge_path(CommonPath, <<>>) ->
CommonPath;
merge_path(CommonPath, Path0) ->
case emqx_http_lib:uri_parse(Path0) of
{ok, #{path := Path1, 'query' := Query}} ->
Path2 = filename:join(CommonPath, Path1),
<<Path2/binary, "?", Query/binary>>;
{ok, #{path := Path1}} ->
filename:join(CommonPath, Path1)
end.
method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete.
headers(undefined) -> [];
headers(Headers) when is_map(Headers) ->
headers(maps:to_list(Headers));
headers(Headers) when is_list(Headers) ->
[{string:to_lower(str(K)), str(V)} || {K, V} <- Headers].
str(Str) when is_list(Str) -> Str;
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
str(Bin) when is_binary(Bin) -> binary_to_list(Bin).
pool_opts(Params = #{<<"url">> := URL}, ResId) ->
{ok, #{host := Host,
port := Port,
scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
PoolSize = maps:get(<<"pool_size">>, Params, 32),
ConnectTimeout =
hocon_postprocess:duration(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))),
TransportOpts0 =
case Scheme =:= https of
true -> [get_ssl_opts(Params, ResId)];
false -> []
end,
TransportOpts = emqx_misc:ipv6_probe(TransportOpts0),
Opts = case Scheme =:= https of
true -> [{transport_opts, TransportOpts}, {transport, ssl}];
false -> [{transport_opts, TransportOpts}]
end,
[{host, Host},
{port, Port},
{pool_size, PoolSize},
{pool_type, hash},
{connect_timeout, ConnectTimeout},
{retry, 5},
{retry_timeout, 1000} | Opts].
pool_name(ResId) ->
list_to_atom("webhook:" ++ str(ResId)).
get_ssl_opts(Opts, ResId) ->
emqx_plugin_libs_ssl:save_files_return_opts(Opts, "rules", ResId).
test_http_connect(Conf) ->
Url = fun() -> maps:get(<<"url">>, Conf) end,
try
emqx_plugin_libs_rule:http_connectivity(Url())
of
ok -> true;
{error, _Reason} ->
?LOG(error, "check http_connectivity failed: ~p", [Url()]),
false
catch
Err:Reason:ST ->
?LOG(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]),
false
end.

View File

@ -1,197 +0,0 @@
#Rule-Engine-APIs
## ENVs
APPSECRET="88ebdd6569afc:Mjg3MzUyNTI2Mjk2NTcyOTEwMDEwMDMzMTE2NTM1MTkzNjA"
## Rules
### test sql
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules?test' -d \
'{"rawsql":"select * from \"message.publish\" where topic=\"t/a\"","ctx":{}}'
### create
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules' -d \
'{"rawsql":"select * from \"t/a\"","actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule"}'
{"code":0,"data":{"actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule","enabled":true,"id":"rule:bc987915","rawsql":"select * from \"t/a\""}}
## with a resource id in the action args
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules' -d \
'{"rawsql":"select * from \"t/a\"","actions":[{"name":"inspect","params":{"$resource":"resource:3a7b44a1"}}],"description":"test-rule"}'
{"code":0,"data":{"actions":[{"name":"inspect","params":{"$resource":"resource:3a7b44a1","a":1}}],"description":"test-rule","enabled":true,"id":"rule:6fce0ca9","rawsql":"select * from \"t/a\""}}
```
### modify
```shell
## modify all of the params
$ curl -XPUT -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' -d \
'{"rawsql":"select * from \"t/a\"","actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule"}'
## modify some of the params: disable it
$ curl -XPUT -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' -d \
'{"enabled": false}'
## modify some of the params: add fallback actions
$ curl -XPUT -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' -d \
'{"actions":[{"name":"inspect","params":{"a":1}, "fallbacks": [{"name":"donothing"}]}]}'
```
### show
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915'
{"code":0,"data":{"actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule","enabled":true,"id":"rule:bc987915","rawsql":"select * from \"t/a\""}}
```
### list
```shell
$ curl -v --basic -u $APPSECRET -k http://localhost:8081/api/v4/rules
{"code":0,"data":[{"actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule","enabled":true,"id":"rule:bc987915","rawsql":"select * from \"t/a\""},{"actions":[{"name":"inspect","params":{"$resource":"resource:3a7b44a1","a":1}}],"description":"test-rule","enabled":true,"id":"rule:6fce0ca9","rawsql":"select * from \"t/a\""}]}
```
### delete
```shell
$ curl -XDELETE -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915'
{"code":0}
```
## Actions
### list
```shell
$ curl -v --basic -u $APPSECRET -k http://localhost:8081/api/v4/actions
{"code":0,"data":[{"app":"emqx_rule_engine","description":"Republish a MQTT message to a another topic","name":"republish","params":{...},"types":[]},{"app":"emqx_rule_engine","description":"Inspect the details of action params for debug purpose","name":"inspect","params":{},"types":[]},{"app":"emqx_web_hook","description":"Forward Messages to Web Server","name":"data_to_webserver","params":{...},"types":["web_hook"]}]}
```
### show
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/actions/inspect'
{"code":0,"data":{"app":"emqx_rule_engine","description":"Debug Action","name":"inspect","params":{"$resource":"built_in"}}}
```
## Resource Types
### list
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_types'
{"code":0,"data":[{"description":"Debug resource type","name":"built_in","params":{},"provider":"emqx_rule_engine"}]}
```
### list all resources of a type
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_types/built_in/resources'
{"code":0,"data":[{"attrs":"undefined","config":{"a":1},"description":"test-rule","id":"resource:71df3086","type":"built_in"}]}
```
### show
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_types/built_in'
{"code":0,"data":{"description":"Debug resource type","name":"built_in","params":{},"provider":"emqx_rule_engine"}}
```
## Resources
### create
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources' -d \
'{"type": "built_in", "config": {"a":1}, "description": "test-resource"}'
{"code":0,"data":{"attrs":"undefined","config":{"a":1},"description":"test-resource","id":"resource:71df3086","type":"built_in"}}
```
### start
```shell
$ curl -XPOST -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources/resource:71df3086'
{"code":0}
```
### list
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources'
{"code":0,"data":[{"attrs":"undefined","config":{"a":1},"description":"test-resource","id":"resource:71df3086","type":"built_in"}]}
```
### show
```shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources/resource:71df3086'
{"code":0,"data":{"attrs":"undefined","config":{"a":1},"description":"test-resource","id":"resource:71df3086","type":"built_in"}}
```
### get resource status
```shell
curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_status/resource:71df3086'
{"code":0,"data":{"is_alive":true}}
```
### delete
```shell
$ curl -XDELETE -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources/resource:71df3086'
{"code":0}
```
## Rule example using webhook
``` shell
$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources' -d \
'{"type": "web_hook", "config": {"url": "http://127.0.0.1:9910", "headers": {"token":"axfw34y235wrq234t4ersgw4t"}, "method": "POST"}, "description": "web hook resource-1"}'
{"code":0,"data":{"attrs":"undefined","config":{"headers":{"token":"axfw34y235wrq234t4ersgw4t"},"method":"POST","url":"http://127.0.0.1:9910"},"description":"web hook resource-1","id":"resource:8531a11f","type":"web_hook"}}
curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules' -d \
'{"rawsql":"SELECT clientid as c, username as u.name FROM \"#\"","actions":[{"name":"data_to_webserver","params":{"$resource": "resource:8531a11f"}}],"description":"Forward connected events to webhook"}'
{"code":0,"data":{"actions":[{"name":"data_to_webserver","params":{"$resource":"resource:8531a11f","headers":{"token":"axfw34y235wrq234t4ersgw4t"},"method":"POST","url":"http://127.0.0.1:9910"}}],"description":"Forward connected events to webhook","enabled":true,"id":"rule:4fe05936","rawsql":"select * from \"#\""}}
```
Start a `web server` using `nc`, and then connect to emqx broker using a mqtt client with username = 'Shawn':
```shell
$ echo -e "HTTP/1.1 200 OK\n\n $(date)" | nc -l 127.0.0.1 9910
POST / HTTP/1.1
content-type: application/json
content-length: 48
te:
host: 127.0.0.1:9910
connection: keep-alive
token: axfw34y235wrq234t4ersgw4t
{"c":"clientId-bP70ymeIyo","u":{"name":"Shawn"}
```

View File

@ -1,164 +0,0 @@
#Rule-Engine-CLIs
## Rules
### create
```shell
$ ./bin/emqx_ctl rules create 'SELECT payload FROM "t/#" username="Steven"' '[{"name":"data_to_webserver", "params": {"$resource": "resource:9093f1cb"}}]' --descr="Msg From Steven to WebServer"
Rule rule:98a75239 created
```
### modify
```shell
## update sql, action, description
$ ./bin/emqx_ctl rules update 'rule:98a75239' \
-s "select * from \"t/a\" " \
-a '[{"name":"do_nothing", "fallbacks": []' -g continue \
-d 'Rule for debug2' \
## update sql only
$ ./bin/emqx_ctl rules update 'rule:98a75239' -s 'SELECT * FROM "t/a"'
## disable the rule
$ ./bin/emqx_ctl rules update 'rule:98a75239' -e false
```
### show
```shell
$ ./bin/emqx_ctl rules show rule:98a75239
rule(id='rule:98a75239', rawsql='SELECT payload FROM "t/#" username="Steven"', actions=[{"name":"data_to_webserver","params":{"$resource":"resource:9093f1cb","url":"http://host-name/chats"}}], enabled='true', description='Msg From Steven to WebServer')
```
### list
```shell
$ ./bin/emqx_ctl rules list
rule(id='rule:98a75239', rawsql='SELECT payload FROM "t/#" username="Steven"', actions=[{"name":"data_to_webserver","params":{"$resource":"resource:9093f1cb","url":"http://host-name/chats"}}], enabled='true', description='Msg From Steven to WebServer')
```
### delete
```shell
$ ./bin/emqx_ctl rules delete 'rule:98a75239'
ok
```
## Actions
### list
```shell
$ ./bin/emqx_ctl rule-actions list
action(name='republish', app='emqx_rule_engine', types=[], params=#{...}, description='Republish a MQTT message to a another topic')
action(name='inspect', app='emqx_rule_engine', types=[], params=#{...}, description='Inspect the details of action params for debug purpose')
action(name='data_to_webserver', app='emqx_web_hook', types=[], params=#{...}, description='Forward Messages to Web Server')
```
### show
```shell
$ ./bin/emqx_ctl rule-actions show 'data_to_webserver'
action(name='data_to_webserver', app='emqx_web_hook', types=['web_hook'], params=#{...}, description='Forward Messages to Web Server')
```
## Resource
### create
```shell
$ ./bin/emqx_ctl resources create 'web_hook' -c '{"url": "http://host-name/chats"}' --descr 'Resource towards http://host-name/chats'
Resource resource:19addfef created
```
### list
```shell
$ ./bin/emqx_ctl resources list
resource(id='resource:19addfef', type='web_hook', config=#{<<"url">> => <<"http://host-name/chats">>}, attrs=undefined, description='Resource towards http://host-name/chats')
```
### list all resources of a type
```shell
$ ./bin/emqx_ctl resources list -t 'web_hook'
resource(id='resource:19addfef', type='web_hook', config=#{<<"url">> => <<"http://host-name/chats">>}, attrs=undefined, description='Resource towards http://host-name/chats')
```
### show
```shell
$ ./bin/emqx_ctl resources show 'resource:19addfef'
resource(id='resource:19addfef', type='web_hook', config=#{<<"url">> => <<"http://host-name/chats">>}, attrs=undefined, description='Resource towards http://host-name/chats')
```
### delete
```shell
$ ./bin/emqx_ctl resources delete 'resource:19addfef'
ok
```
## Resources Types
### list
```shell
$ ./bin/emqx_ctl resource-types list
resource_type(name='built_in', provider='emqx_rule_engine', params=#{...}, on_create={emqx_rule_actions,on_resource_create}, description='The built in resource type for debug purpose')
resource_type(name='web_hook', provider='emqx_web_hook', params=#{...}, on_create={emqx_web_hook_actions,on_resource_create}, description='WebHook Resource')
```
### show
```shell
$ ./bin/emqx_ctl resource-types show built_in
resource_type(name='built_in', provider='emqx_rule_engine', params=#{}, description='The built in resource type for debug purpose')
```
## Rule example using webhook
``` shell
1. Create a webhook resource to URL http://127.0.0.1:9910.
./bin/emqx_ctl resources create 'web_hook' --config '{"url": "http://127.0.0.1:9910", "headers": {"token":"axfw34y235wrq234t4ersgw4t"}, "method": "POST"}'
Resource resource:3128243e created
2. Create a rule using action data_to_webserver, and bind above resource to that action.
./bin/emqx_ctl rules create 'client.connected' 'SELECT clientid as c, username as u.name FROM "#"' '[{"name":"data_to_webserver", "params": {"$resource": "resource:3128243e"}}]' --descr "Forward Connected Events to WebServer"
Rule rule:222b59f7 created
```
Start a simple `Web Server` using `nc`, and then connect to emqx broker using a mqtt client with username = 'Shawn':
```shell
$ echo -e "HTTP/1.1 200 OK\n\n $(date)" | nc -l 127.0.0.1 9910
POST / HTTP/1.1
content-type: application/json
content-length: 48
te:
host: 127.0.0.1:9910
connection: keep-alive
token: axfw34y235wrq234t4ersgw4t
{"c":"clientId-bP70ymeIyo","u":{"name":"Shawn"}
```

View File

@ -1,188 +0,0 @@
# EMQ X Rule Engine
This is the design guide of message routing rule engine for the EMQ X Broker.
## Concept
A rule is:
```
when
Match <conditions> | <predicates>
then
Select <data> and Take <action>;
```
or:
```
rule "Rule Name"
when
rule match
select
para1 = val1
para2 = val2
then
action(#{para2 => val1, #para2 => val2})
```
## Architecture
```
|-----------------|
P ---->| Message Routing |----> S
|-----------------|
| /|\
\|/ |
|-----------------|
| Rule Engine |
|-----------------|
| |
Backends Services Bridges
```
## Design
```
Event | Message -> Rules -> Actions -> Resources
```
```
P -> |--------------------| |---------------------------------------|
| Messages (Routing) | -> | Rules (Select Data, Match Conditions) |
S <- |--------------------| |---------------------------------------|
|---------| |-----------| |-------------------------------|
->| Actions | -> | Resources | -> | (Backends, Bridges, WebHooks) |
|---------| |-----------| |-------------------------------|
```
## Rule
A rule consists of a SELECT statement, a topic filter, and a rule action
Rules consist of the following:
- Id
- Name
- Topic
- Description
- Action
- Enabled
The operations on a rule:
- Create
- Enable
- Disable
- Delete
## Action
Actions consist of the following:
- Id
- Name
- For
- App
- Module
- Func
- Args
- Descr
Define a rule action in ADT:
```
action :: Application -> Resource -> Params -> IO ()
```
A rule action:
Module:function(Args)
## Resource
### Resource Name
```
backend:mysql:localhost:port:db
backend:redis:localhost:
webhook:url
bridge:kafka:
bridge:rabbit:localhost
```
### Resource Properties
- Name
- Descr or Description
- Config #{}
- Instances
- State: Running | Stopped
### Resource Management
1. Create Resource
2. List Resources
3. Lookup Resource
4. Delete Resource
5. Test Resource
### Resource State (Lifecircle)
0. Create Resource and Validate a Resource
1. Start/Connect Resource
2. Bind resource name to instance
3. Stop/Disconnect Resource
4. Unbind resource name with instance
5. Is Resource Alive?
### Resource Type
The properties and behaviors of resources is defined by resource types. A resoure type is provided(contributed) by a plugin.
### Resource Type Provider
Provider of resource type is a EMQ X Plugin.
### Resource Manager
```
Supervisor
|
\|/
Action ----> Proxy(Batch|Write) ----> Connection -----> ExternalResource
| /|\
|------------------Fetch----------------|
```
## REST API
Rules API
Actions API
Resources API
## CLI
```
rules list
rules show <RuleId>
rule-actions list
rule-actions show <ActionId>
resources list
resources show <ResourceId>
resource_templates list
resource_templates show <ResourceType>
```

View File

@ -1,11 +0,0 @@
-compile({parse_transform, emqx_rule_actions_trans}).
-type selected_data() :: map().
-type env_vars() :: map().
-type bindings() :: list({atom(), term()}).
-define(BINDING_KEYS, '__bindings__').
-define(bound_v(Key, ENVS0),
maps:get(Key,
maps:get(?BINDING_KEYS, ENVS0, #{}))).

View File

@ -26,5 +26,4 @@ start(_Type, _Args) ->
emqx_rule_engine_sup:start_link().
stop(_State) ->
ok = emqx_rule_events:unload(),
ok = emqx_rule_engine_cli:unload().
ok = emqx_rule_events:unload().

View File

@ -272,7 +272,6 @@ relx_apps(ReleaseType) ->
, emqx_exhook
, emqx_bridge
, emqx_rule_engine
, emqx_rule_actions
, emqx_modules
, emqx_management
, emqx_dashboard