refactor(connector): add connector configs
This commit is contained in:
parent
cf3f2c3057
commit
7d64013edd
|
@ -132,7 +132,7 @@ start_apps(Apps) ->
|
||||||
start_apps(Apps, Handler) when is_function(Handler) ->
|
start_apps(Apps, Handler) when is_function(Handler) ->
|
||||||
%% Load all application code to beam vm first
|
%% Load all application code to beam vm first
|
||||||
%% Because, minirest, ekka etc.. application will scan these modules
|
%% Because, minirest, ekka etc.. application will scan these modules
|
||||||
lists:foreach(fun load/1, [emqx_machine, emqx_conf, emqx | Apps]),
|
lists:foreach(fun load/1, [emqx_conf, emqx | Apps]),
|
||||||
ekka:start(),
|
ekka:start(),
|
||||||
lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
|
lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
|
||||||
|
|
||||||
|
|
|
@ -3,73 +3,54 @@
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
|
|
||||||
## MQTT bridges to/from another MQTT broker
|
## MQTT bridges to/from another MQTT broker
|
||||||
bridges.mqtt.my_mqtt_bridge_to_aws {
|
#bridges.mqtt.my_ingress_mqtt_bridge {
|
||||||
server = "127.0.0.1:1883"
|
# connector = my_mqtt_connector
|
||||||
proto_ver = "v4"
|
# direction = ingress
|
||||||
username = "username1"
|
# ## topic mappings for this bridge
|
||||||
password = ""
|
# from_remote_topic = "aws/#"
|
||||||
clean_start = true
|
# subscribe_qos = 1
|
||||||
keepalive = 300
|
# to_local_topic = "from_aws/${topic}"
|
||||||
retry_interval = "30s"
|
# payload = "${payload}"
|
||||||
max_inflight = 32
|
# qos = "${qos}"
|
||||||
reconnect_interval = "30s"
|
# retain = "${retain}"
|
||||||
bridge_mode = true
|
#
|
||||||
replayq {
|
#}
|
||||||
dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
|
#
|
||||||
seg_bytes = "100MB"
|
#bridges.mqtt.my_egress_mqtt_bridge {
|
||||||
offload = false
|
# connector = my_mqtt_connector
|
||||||
}
|
# direction = egress
|
||||||
ssl {
|
# ## topic mappings for this bridge
|
||||||
enable = false
|
# from_local_topic = "emqx/#"
|
||||||
keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
# to_remote_topic = "from_emqx/${topic}"
|
||||||
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
# payload = "${payload}"
|
||||||
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
# qos = 1
|
||||||
}
|
# retain = false
|
||||||
|
#}
|
||||||
## topic mappings for this bridge
|
|
||||||
ingress {
|
|
||||||
from_remote_topic = "aws/#"
|
|
||||||
subscribe_qos = 1
|
|
||||||
to_local_topic = "from_aws/${topic}"
|
|
||||||
payload = "${payload}"
|
|
||||||
qos = "${qos}"
|
|
||||||
retain = "${retain}"
|
|
||||||
}
|
|
||||||
|
|
||||||
egress {
|
|
||||||
from_local_topic = "emqx/#"
|
|
||||||
to_remote_topic = "from_emqx/${topic}"
|
|
||||||
payload = "${payload}"
|
|
||||||
qos = 1
|
|
||||||
retain = false
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
## HTTP bridges to an HTTP server
|
## HTTP bridges to an HTTP server
|
||||||
bridges.http.my_http_bridge {
|
#bridges.http.my_http_bridge {
|
||||||
## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
|
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
|
||||||
url = "http://localhost:9901/messages/${topic}"
|
# url = "http://localhost:9901/messages/${topic}"
|
||||||
request_timeout = "30s"
|
# request_timeout = "30s"
|
||||||
connect_timeout = "30s"
|
# connect_timeout = "30s"
|
||||||
max_retries = 3
|
# max_retries = 3
|
||||||
retry_interval = "10s"
|
# retry_interval = "10s"
|
||||||
pool_type = "random"
|
# pool_type = "random"
|
||||||
pool_size = 4
|
# pool_size = 4
|
||||||
enable_pipelining = true
|
# enable_pipelining = true
|
||||||
ssl {
|
# ssl {
|
||||||
enable = false
|
# enable = false
|
||||||
keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
}
|
# }
|
||||||
|
#
|
||||||
from_local_topic = "emqx_http/#"
|
# from_local_topic = "emqx_http/#"
|
||||||
## the following config entries can use placehodler variables:
|
# ## the following config entries can use placehodler variables:
|
||||||
## url, method, body, headers
|
# ## url, method, body, headers
|
||||||
method = post
|
# method = post
|
||||||
body = "${payload}"
|
# body = "${payload}"
|
||||||
headers {
|
# headers {
|
||||||
"content-type": "application/json"
|
# "content-type": "application/json"
|
||||||
}
|
# }
|
||||||
}
|
#}
|
||||||
|
|
|
@ -84,7 +84,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
||||||
send_message(BridgeId, Message) ->
|
send_message(BridgeId, Message) ->
|
||||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
||||||
emqx_resource:query(ResId, {send_message, BridgeId, Message}).
|
emqx_resource:query(ResId, {send_message, Message}).
|
||||||
|
|
||||||
config_key_path() ->
|
config_key_path() ->
|
||||||
[bridges].
|
[bridges].
|
||||||
|
@ -178,7 +178,7 @@ create(Type, Name, Conf) ->
|
||||||
config => Conf}),
|
config => Conf}),
|
||||||
ResId = resource_id(Type, Name),
|
ResId = resource_id(Type, Name),
|
||||||
case emqx_resource:create(ResId,
|
case emqx_resource:create(ResId,
|
||||||
emqx_bridge:resource_type(Type), parse_confs(Type, Conf)) of
|
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
|
||||||
{ok, already_created} ->
|
{ok, already_created} ->
|
||||||
emqx_resource:get_instance(ResId);
|
emqx_resource:get_instance(ResId);
|
||||||
{ok, Data} ->
|
{ok, Data} ->
|
||||||
|
@ -199,7 +199,7 @@ update(Type, Name, {_OldConf, Conf}) ->
|
||||||
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
|
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
|
||||||
config => Conf}),
|
config => Conf}),
|
||||||
emqx_resource:recreate(resource_id(Type, Name),
|
emqx_resource:recreate(resource_id(Type, Name),
|
||||||
emqx_bridge:resource_type(Type), parse_confs(Type, Conf), []).
|
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
|
||||||
|
|
||||||
remove(Type, Name, _Conf) ->
|
remove(Type, Name, _Conf) ->
|
||||||
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
|
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
|
||||||
|
@ -227,8 +227,12 @@ get_matched_bridges(Topic) ->
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
maps:fold(fun (BType, Conf, Acc0) ->
|
maps:fold(fun (BType, Conf, Acc0) ->
|
||||||
maps:fold(fun
|
maps:fold(fun
|
||||||
(BName, #{egress := Egress}, Acc1) ->
|
%% Confs for MQTT, Kafka bridges have the `direction` flag
|
||||||
|
(_BName, #{direction := ingress}, Acc1) ->
|
||||||
|
Acc1;
|
||||||
|
(BName, #{direction := egress} = Egress, Acc1) ->
|
||||||
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1);
|
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1);
|
||||||
|
%% HTTP, MySQL bridges only have egress direction
|
||||||
(BName, BridgeConf, Acc1) ->
|
(BName, BridgeConf, Acc1) ->
|
||||||
get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1)
|
get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1)
|
||||||
end, Acc0, Conf)
|
end, Acc0, Conf)
|
||||||
|
@ -240,12 +244,13 @@ get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) -
|
||||||
false -> Acc
|
false -> Acc
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_confs(http, #{ url := Url
|
parse_confs(http, _Name,
|
||||||
, method := Method
|
#{ url := Url
|
||||||
, body := Body
|
, method := Method
|
||||||
, headers := Headers
|
, body := Body
|
||||||
, request_timeout := ReqTimeout
|
, headers := Headers
|
||||||
} = Conf) ->
|
, request_timeout := ReqTimeout
|
||||||
|
} = Conf) ->
|
||||||
{BaseUrl, Path} = parse_url(Url),
|
{BaseUrl, Path} = parse_url(Url),
|
||||||
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
|
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
|
||||||
Conf#{ base_url => BaseUrl2
|
Conf#{ base_url => BaseUrl2
|
||||||
|
@ -257,8 +262,20 @@ parse_confs(http, #{ url := Url
|
||||||
, request_timeout => ReqTimeout
|
, request_timeout => ReqTimeout
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
parse_confs(_Type, Conf) ->
|
parse_confs(Type, Name, #{connector := ConnName, direction := Direction} = Conf) ->
|
||||||
Conf.
|
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
|
||||||
|
make_resource_confs(Direction, ConnectorConfs,
|
||||||
|
maps:without([connector, direction], Conf), Name).
|
||||||
|
|
||||||
|
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) ->
|
||||||
|
BName = bin(Name),
|
||||||
|
ConnectorConfs#{
|
||||||
|
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
|
||||||
|
};
|
||||||
|
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) ->
|
||||||
|
ConnectorConfs#{
|
||||||
|
egress => BridgeConf
|
||||||
|
}.
|
||||||
|
|
||||||
parse_url(Url) ->
|
parse_url(Url) ->
|
||||||
case string:split(Url, "//", leading) of
|
case string:split(Url, "//", leading) of
|
||||||
|
|
|
@ -11,17 +11,26 @@ roots() -> [bridges].
|
||||||
|
|
||||||
fields(bridges) ->
|
fields(bridges) ->
|
||||||
[ {mqtt,
|
[ {mqtt,
|
||||||
sc(hoconsc:map(name, ref("mqtt_bridge")),
|
sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge")
|
||||||
|
, ref("egress_mqtt_bridge")
|
||||||
|
])),
|
||||||
#{ desc => "MQTT bridges"
|
#{ desc => "MQTT bridges"
|
||||||
})}
|
})}
|
||||||
, {http,
|
, {http,
|
||||||
sc(hoconsc:map(name, ref("http_bridge")),
|
sc(hoconsc:map(name, ref("http_bridge")),
|
||||||
#{ desc => "HTTP bridges"
|
#{ desc => "HTTP bridges"
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
|
|
||||||
fields("mqtt_bridge") ->
|
fields("ingress_mqtt_bridge") ->
|
||||||
emqx_connector_mqtt:fields("config");
|
[ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
|
||||||
|
, connector_name()
|
||||||
|
] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
|
||||||
|
|
||||||
|
fields("egress_mqtt_bridge") ->
|
||||||
|
[ direction(egress, emqx_connector_mqtt_schema:egress_desc())
|
||||||
|
, connector_name()
|
||||||
|
] ++ emqx_connector_mqtt_schema:fields("egress");
|
||||||
|
|
||||||
fields("http_bridge") ->
|
fields("http_bridge") ->
|
||||||
basic_config_http() ++
|
basic_config_http() ++
|
||||||
|
@ -85,6 +94,24 @@ How long will the HTTP request timeout.
|
||||||
})}
|
})}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
direction(Dir, Desc) ->
|
||||||
|
{direction,
|
||||||
|
sc(Dir,
|
||||||
|
#{ nullable => false
|
||||||
|
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>" ++
|
||||||
|
Desc
|
||||||
|
})}.
|
||||||
|
|
||||||
|
connector_name() ->
|
||||||
|
{connector,
|
||||||
|
sc(binary(),
|
||||||
|
#{ nullable => false
|
||||||
|
, desc =>"""
|
||||||
|
The connector name to be used for this bridge.
|
||||||
|
Connectors are configured by 'connectors.<type>.<name>
|
||||||
|
"""
|
||||||
|
})}.
|
||||||
|
|
||||||
basic_config_http() ->
|
basic_config_http() ->
|
||||||
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
|
|
||||||
|
|
|
@ -112,6 +112,7 @@ t_crud_apis(_) ->
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
|
||||||
%% then we add a http bridge now
|
%% then we add a http bridge now
|
||||||
|
%% PUT /bridges/:id will create or update a bridge
|
||||||
{ok, 200, Bridge} = request(put, uri(["bridges", "http:test_bridge"]), ?HTTP_BRIDGE(?URL1)),
|
{ok, 200, Bridge} = request(put, uri(["bridges", "http:test_bridge"]), ?HTTP_BRIDGE(?URL1)),
|
||||||
%ct:pal("---bridge: ~p", [Bridge]),
|
%ct:pal("---bridge: ~p", [Bridge]),
|
||||||
?assertMatch([ #{ <<"id">> := <<"http:test_bridge">>
|
?assertMatch([ #{ <<"id">> := <<"http:test_bridge">>
|
||||||
|
@ -139,11 +140,35 @@ t_crud_apis(_) ->
|
||||||
, <<"url">> := ?URL2
|
, <<"url">> := ?URL2
|
||||||
}], jsx:decode(Bridge2Str)),
|
}], jsx:decode(Bridge2Str)),
|
||||||
|
|
||||||
|
%% get the bridge by id
|
||||||
|
{ok, 200, Bridge3Str} = request(get, uri(["bridges", "http:test_bridge"]), []),
|
||||||
|
?assertMatch([#{ <<"id">> := <<"http:test_bridge">>
|
||||||
|
, <<"bridge_type">> := <<"http">>
|
||||||
|
, <<"is_connected">> := _
|
||||||
|
, <<"node">> := _
|
||||||
|
, <<"url">> := ?URL2
|
||||||
|
}], jsx:decode(Bridge3Str)),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
{ok,200,<<>>} = request(delete, uri(["bridges", "http:test_bridge"]), []),
|
{ok,200,<<>>} = request(delete, uri(["bridges", "http:test_bridge"]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_change_is_connnected_to_status() ->
|
||||||
|
error(not_implimented).
|
||||||
|
|
||||||
|
t_start_stop_bridges(_) ->
|
||||||
|
start_http_server(9901, fun handle_fun_200_ok/1),
|
||||||
|
{ok, 200, Bridge} = request(put, uri(["bridges", "http:test_bridge"]), ?HTTP_BRIDGE(?URL1)),
|
||||||
|
?assertMatch( #{ <<"id">> := <<"http:test_bridge">>
|
||||||
|
, <<"bridge_type">> := <<"http">>
|
||||||
|
, <<"is_connected">> := true
|
||||||
|
, <<"node">> := _
|
||||||
|
, <<"url">> := ?URL1
|
||||||
|
}, jsx:decode(Bridge)),
|
||||||
|
{ok, 200, <<>>} = request(put,
|
||||||
|
uri(["nodes", node(), "bridges", "http:test_bridge", "operation", "stop"]),
|
||||||
|
?HTTP_BRIDGE(?URL1)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% HTTP Request
|
%% HTTP Request
|
||||||
|
|
|
@ -56,6 +56,7 @@
|
||||||
, emqx_exhook_schema
|
, emqx_exhook_schema
|
||||||
, emqx_psk_schema
|
, emqx_psk_schema
|
||||||
, emqx_limiter_schema
|
, emqx_limiter_schema
|
||||||
|
, emqx_connector_schema
|
||||||
]).
|
]).
|
||||||
|
|
||||||
namespace() -> undefined.
|
namespace() -> undefined.
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
#connectors.mqtt.my_mqtt_connector {
|
||||||
|
# server = "127.0.0.1:1883"
|
||||||
|
# proto_ver = "v4"
|
||||||
|
# username = "username1"
|
||||||
|
# password = ""
|
||||||
|
# clean_start = true
|
||||||
|
# keepalive = 300
|
||||||
|
# retry_interval = "30s"
|
||||||
|
# max_inflight = 32
|
||||||
|
# reconnect_interval = "30s"
|
||||||
|
# bridge_mode = true
|
||||||
|
# replayq {
|
||||||
|
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
|
||||||
|
# seg_bytes = "100MB"
|
||||||
|
# offload = false
|
||||||
|
# }
|
||||||
|
# ssl {
|
||||||
|
# enable = false
|
||||||
|
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
|
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
|
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
|
# }
|
||||||
|
#}
|
|
@ -178,9 +178,9 @@ on_stop(InstId, #{pool_name := PoolName}) ->
|
||||||
connector => InstId}),
|
connector => InstId}),
|
||||||
ehttpc_sup:stop_pool(PoolName).
|
ehttpc_sup:stop_pool(PoolName).
|
||||||
|
|
||||||
on_query(InstId, {send_message, BridgeId, Msg}, AfterQuery, State) ->
|
on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
|
||||||
case maps:get(request, State, undefined) of
|
case maps:get(request, State, undefined) of
|
||||||
undefined -> ?SLOG(error, #{msg => "request not found", bridge_id => BridgeId});
|
undefined -> ?SLOG(error, #{msg => "request not found", connector => InstId});
|
||||||
Request ->
|
Request ->
|
||||||
#{method := Method, path := Path, body := Body, headers := Headers,
|
#{method := Method, path := Path, body := Body, headers := Headers,
|
||||||
request_timeout := Timeout} = process_request(Request, Msg),
|
request_timeout := Timeout} = process_request(Request, Msg),
|
||||||
|
|
|
@ -89,64 +89,70 @@ drop_bridge(Name) ->
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
|
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
|
||||||
%% if the bridge received msgs from the remote broker.
|
%% if the bridge received msgs from the remote broker.
|
||||||
on_message_received(Msg, BridgeId) ->
|
on_message_received(Msg, HookPoint) ->
|
||||||
Name = atom_to_binary(BridgeId, utf8),
|
emqx:run_hook(HookPoint, [Msg]).
|
||||||
emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]).
|
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
on_start(InstId, Conf) ->
|
on_start(InstId, Conf) ->
|
||||||
"bridge:" ++ NamePrefix = binary_to_list(InstId),
|
InstanceId = binary_to_atom(InstId, utf8),
|
||||||
BridgeId = list_to_atom(NamePrefix),
|
|
||||||
?SLOG(info, #{msg => "starting mqtt connector",
|
?SLOG(info, #{msg => "starting mqtt connector",
|
||||||
connector => BridgeId, config => Conf}),
|
connector => InstanceId, config => Conf}),
|
||||||
BasicConf = basic_config(Conf),
|
BasicConf = basic_config(Conf),
|
||||||
SubRemoteConf = maps:get(ingress, Conf, #{}),
|
|
||||||
FrowardConf = maps:get(egress, Conf, #{}),
|
|
||||||
BridgeConf = BasicConf#{
|
BridgeConf = BasicConf#{
|
||||||
name => BridgeId,
|
name => InstanceId,
|
||||||
clientid => clientid(BridgeId),
|
clientid => clientid(InstanceId),
|
||||||
subscriptions => SubRemoteConf#{
|
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
|
||||||
to_local_topic => maps:get(to_local_topic, SubRemoteConf, undefined),
|
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
|
||||||
on_message_received => {fun ?MODULE:on_message_received/2, [BridgeId]}
|
|
||||||
},
|
|
||||||
forwards => FrowardConf#{
|
|
||||||
from_local_topic => maps:get(from_local_topic, FrowardConf, undefined)
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
case ?MODULE:create_bridge(BridgeConf) of
|
case ?MODULE:create_bridge(BridgeConf) of
|
||||||
{ok, _Pid} ->
|
{ok, _Pid} ->
|
||||||
case emqx_connector_mqtt_worker:ensure_started(BridgeId) of
|
case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
|
||||||
ok -> {ok, #{name => BridgeId}};
|
ok -> {ok, #{name => InstanceId}};
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end;
|
end;
|
||||||
{error, {already_started, _Pid}} ->
|
{error, {already_started, _Pid}} ->
|
||||||
{ok, #{name => BridgeId}};
|
{ok, #{name => InstanceId}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_stop(_InstId, #{name := BridgeId}) ->
|
on_stop(_InstId, #{name := InstanceId}) ->
|
||||||
?SLOG(info, #{msg => "stopping mqtt connector",
|
?SLOG(info, #{msg => "stopping mqtt connector",
|
||||||
connector => BridgeId}),
|
connector => InstanceId}),
|
||||||
case ?MODULE:drop_bridge(BridgeId) of
|
case ?MODULE:drop_bridge(InstanceId) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, not_found} -> ok;
|
{error, not_found} -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{msg => "stop mqtt connector",
|
?SLOG(error, #{msg => "stop mqtt connector",
|
||||||
connector => BridgeId, reason => Reason})
|
connector => InstanceId, reason => Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_query(_InstId, {send_message, BridgeId, Msg}, _AfterQuery, _State) ->
|
on_query(_InstId, {send_message, Msg}, _AfterQuery, #{name := InstanceId}) ->
|
||||||
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
|
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
|
||||||
connector => BridgeId}),
|
connector => InstanceId}),
|
||||||
emqx_connector_mqtt_worker:send_to_remote(BridgeId, Msg).
|
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg).
|
||||||
|
|
||||||
on_health_check(_InstId, #{name := BridgeId} = State) ->
|
on_health_check(_InstId, #{name := InstanceId} = State) ->
|
||||||
case emqx_connector_mqtt_worker:ping(BridgeId) of
|
case emqx_connector_mqtt_worker:ping(InstanceId) of
|
||||||
pong -> {ok, State};
|
pong -> {ok, State};
|
||||||
_ -> {error, {connector_down, BridgeId}, State}
|
_ -> {error, {connector_down, InstanceId}, State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
make_sub_confs(undefined) ->
|
||||||
|
undefined;
|
||||||
|
make_sub_confs(SubRemoteConf) ->
|
||||||
|
case maps:take(hookpoint, SubRemoteConf) of
|
||||||
|
error -> SubRemoteConf;
|
||||||
|
{HookPoint, SubConf} ->
|
||||||
|
MFA = {?MODULE, on_message_received, [HookPoint]},
|
||||||
|
SubConf#{on_message_received => MFA}
|
||||||
|
end.
|
||||||
|
|
||||||
|
make_forward_confs(undefined) ->
|
||||||
|
undefined;
|
||||||
|
make_forward_confs(FrowardConf) ->
|
||||||
|
FrowardConf.
|
||||||
|
|
||||||
basic_config(#{
|
basic_config(#{
|
||||||
server := Server,
|
server := Server,
|
||||||
reconnect_interval := ReconnIntv,
|
reconnect_interval := ReconnIntv,
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
-module(emqx_connector_schema).
|
||||||
|
|
||||||
|
-behaviour(hocon_schema).
|
||||||
|
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
|
||||||
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
|
%%======================================================================================
|
||||||
|
%% Hocon Schema Definitions
|
||||||
|
|
||||||
|
roots() -> ["connectors"].
|
||||||
|
|
||||||
|
fields("connectors") ->
|
||||||
|
[ {mqtt,
|
||||||
|
sc(hoconsc:map(name,
|
||||||
|
hoconsc:union([ ref("mqtt_connector")
|
||||||
|
])),
|
||||||
|
#{ desc => "MQTT bridges"
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
|
||||||
|
fields("mqtt_connector") ->
|
||||||
|
emqx_connector_mqtt_schema:fields("connector").
|
||||||
|
|
||||||
|
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
||||||
|
|
||||||
|
ref(Field) -> hoconsc:ref(?MODULE, Field).
|
|
@ -160,13 +160,17 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
|
||||||
|
|
||||||
handle_publish(Msg, undefined) ->
|
handle_publish(Msg, undefined) ->
|
||||||
?SLOG(error, #{msg => "cannot publish to local broker as"
|
?SLOG(error, #{msg => "cannot publish to local broker as"
|
||||||
" ingress_channles' is not configured",
|
" 'ingress' is not configured",
|
||||||
message => Msg});
|
message => Msg});
|
||||||
handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) ->
|
handle_publish(Msg, Vars) ->
|
||||||
?SLOG(debug, #{msg => "publish to local broker",
|
?SLOG(debug, #{msg => "publish to local broker",
|
||||||
message => Msg, vars => Vars}),
|
message => Msg, vars => Vars}),
|
||||||
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
|
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
|
||||||
_ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]),
|
case Vars of
|
||||||
|
#{on_message_received := {Mod, Func, Args}} ->
|
||||||
|
_ = erlang:apply(Mod, Func, [Msg | Args]);
|
||||||
|
_ -> ok
|
||||||
|
end,
|
||||||
case maps:get(to_local_topic, Vars, undefined) of
|
case maps:get(to_local_topic, Vars, undefined) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
_Topic ->
|
_Topic ->
|
||||||
|
|
|
@ -21,7 +21,12 @@
|
||||||
-behaviour(hocon_schema).
|
-behaviour(hocon_schema).
|
||||||
|
|
||||||
-export([ roots/0
|
-export([ roots/0
|
||||||
, fields/1]).
|
, fields/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ ingress_desc/0
|
||||||
|
, egress_desc/0
|
||||||
|
]).
|
||||||
|
|
||||||
-import(emqx_schema, [mk_duration/2]).
|
-import(emqx_schema, [mk_duration/2]).
|
||||||
|
|
||||||
|
@ -29,6 +34,10 @@ roots() ->
|
||||||
fields("config").
|
fields("config").
|
||||||
|
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
|
fields("connector") ++
|
||||||
|
topic_mappings();
|
||||||
|
|
||||||
|
fields("connector") ->
|
||||||
[ {server,
|
[ {server,
|
||||||
sc(emqx_schema:ip_port(),
|
sc(emqx_schema:ip_port(),
|
||||||
#{ default => "127.0.0.1:1883"
|
#{ default => "127.0.0.1:1883"
|
||||||
|
@ -76,31 +85,6 @@ fields("config") ->
|
||||||
sc(ref("replayq"),
|
sc(ref("replayq"),
|
||||||
#{ desc => """
|
#{ desc => """
|
||||||
Queue messages in disk files.
|
Queue messages in disk files.
|
||||||
"""
|
|
||||||
})}
|
|
||||||
, {ingress,
|
|
||||||
sc(ref("ingress"),
|
|
||||||
#{ default => #{}
|
|
||||||
, desc => """
|
|
||||||
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
|
|
||||||
send them to the local broker.<br>
|
|
||||||
Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
|
|
||||||
'payload'.<br>
|
|
||||||
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
|
|
||||||
configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
|
|
||||||
the rule.
|
|
||||||
"""
|
|
||||||
})}
|
|
||||||
, {egress,
|
|
||||||
sc(ref("egress"),
|
|
||||||
#{ default => #{}
|
|
||||||
, desc => """
|
|
||||||
The egress config defines how this bridge forwards messages from the local broker to the remote
|
|
||||||
broker.<br>
|
|
||||||
Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.<br>
|
|
||||||
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
|
|
||||||
is configured, then both the data got from the rule and the MQTT messages that matches
|
|
||||||
from_local_topic will be forwarded.
|
|
||||||
"""
|
"""
|
||||||
})}
|
})}
|
||||||
] ++ emqx_connector_schema_lib:ssl_fields();
|
] ++ emqx_connector_schema_lib:ssl_fields();
|
||||||
|
@ -122,6 +106,12 @@ fields("ingress") ->
|
||||||
#{ desc => """
|
#{ desc => """
|
||||||
Send messages to which topic of the local broker.<br>
|
Send messages to which topic of the local broker.<br>
|
||||||
Template with variables is allowed.
|
Template with variables is allowed.
|
||||||
|
"""
|
||||||
|
})}
|
||||||
|
, {hookpoint,
|
||||||
|
sc(binary(),
|
||||||
|
#{ desc => """
|
||||||
|
The hookpoint will be triggered when there's any message received from the remote broker.
|
||||||
"""
|
"""
|
||||||
})}
|
})}
|
||||||
] ++ common_inout_confs();
|
] ++ common_inout_confs();
|
||||||
|
@ -170,6 +160,38 @@ the memory cache reaches 'seg_bytes'.
|
||||||
})}
|
})}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
topic_mappings() ->
|
||||||
|
[ {ingress,
|
||||||
|
sc(ref("ingress"),
|
||||||
|
#{ default => #{}
|
||||||
|
, desc => ingress_desc()
|
||||||
|
})}
|
||||||
|
, {egress,
|
||||||
|
sc(ref("egress"),
|
||||||
|
#{ default => #{}
|
||||||
|
, desc => egress_desc()
|
||||||
|
})}
|
||||||
|
].
|
||||||
|
|
||||||
|
ingress_desc() -> """
|
||||||
|
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
|
||||||
|
send them to the local broker.<br>
|
||||||
|
Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
|
||||||
|
'payload'.<br>
|
||||||
|
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
|
||||||
|
configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
|
||||||
|
the rule.
|
||||||
|
""".
|
||||||
|
|
||||||
|
egress_desc() -> """
|
||||||
|
The egress config defines how this bridge forwards messages from the local broker to the remote
|
||||||
|
broker.<br>
|
||||||
|
Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.<br>
|
||||||
|
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
|
||||||
|
is configured, then both the data got from the rule and the MQTT messages that matches
|
||||||
|
from_local_topic will be forwarded.
|
||||||
|
""".
|
||||||
|
|
||||||
common_inout_confs() ->
|
common_inout_confs() ->
|
||||||
[ {qos,
|
[ {qos,
|
||||||
sc(qos(),
|
sc(qos(),
|
||||||
|
|
|
@ -381,7 +381,7 @@ pop_and_send_loop(#{replayq := Q} = State, N) ->
|
||||||
|
|
||||||
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
|
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
|
||||||
?SLOG(error, #{msg => "cannot forward messages to remote broker"
|
?SLOG(error, #{msg => "cannot forward messages to remote broker"
|
||||||
" as forwards is not configured",
|
" as 'egress' is not configured",
|
||||||
messages => Msg});
|
messages => Msg});
|
||||||
do_send(#{inflight := Inflight,
|
do_send(#{inflight := Inflight,
|
||||||
connection := Connection,
|
connection := Connection,
|
||||||
|
|
|
@ -68,7 +68,7 @@ reload() ->
|
||||||
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
|
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
|
||||||
end, emqx_rule_engine:get_rules()).
|
end, emqx_rule_engine:get_rules()).
|
||||||
|
|
||||||
load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) ->
|
load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) ->
|
||||||
emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
|
emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
|
||||||
[#{bridge_topic => BridgeTopic}]});
|
[#{bridge_topic => BridgeTopic}]});
|
||||||
load(Topic) ->
|
load(Topic) ->
|
||||||
|
|
Loading…
Reference in New Issue