Merge branch 'release-54' into sync-r54-m-20231218

This commit is contained in:
Thales Macedo Garitezi 2023-12-18 17:21:08 -03:00
commit cf9331a95f
67 changed files with 2085 additions and 633 deletions

View File

@ -21,7 +21,7 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.5.2
export EMQX_EE_DASHBOARD_VERSION ?= e1.4.0-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.4.0-beta.8
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.4.0-alpha.1").
-define(EMQX_RELEASE_CE, "5.4.0-alpha.2").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.4.0-alpha.1").
-define(EMQX_RELEASE_EE, "5.4.0-alpha.2").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -25,8 +25,6 @@
-callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
-callback event(EventName :: term(), Attributes :: term()) -> ok.
-type channel_info() :: #{atom() => _}.
-export([
@ -35,9 +33,7 @@
unregister_provider/1,
trace_process_publish/3,
start_trace_send/2,
end_trace_send/1,
event/1,
event/2
end_trace_send/1
]).
-export_type([channel_info/0]).
@ -79,6 +75,7 @@ unregister_provider(Module) ->
-spec provider() -> module() | undefined.
provider() ->
persistent_term:get(?PROVIDER, undefined).
%%--------------------------------------------------------------------
%% trace API
%%--------------------------------------------------------------------
@ -99,13 +96,6 @@ start_trace_send(Delivers, ChannelInfo) ->
end_trace_send(Packets) ->
?with_provider(?FUNCTION_NAME(Packets), ok).
event(Name) ->
event(Name, #{}).
-spec event(term(), term()) -> ok.
event(Name, Attributes) ->
?with_provider(?FUNCTION_NAME(Name, Attributes), ok).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -119,7 +119,7 @@ log_to_db(Log) ->
Audit0 = to_audit(Log),
Audit = Audit0#?AUDIT{
node = node(),
created_at = erlang:system_time(millisecond)
created_at = erlang:system_time(microsecond)
},
mria:dirty_write(?AUDIT, Audit).

View File

@ -32,7 +32,7 @@
{<<"http_method">>, atom},
{<<"gte_created_at">>, timestamp},
{<<"lte_created_at">>, timestamp},
{<<"gte_duration_ms">>, timestamp},
{<<"gte_duration_ms">>, integer},
{<<"lte_duration_ms">>, integer}
]).
-define(DISABLE_MSG, <<"Audit is disabled">>).
@ -130,14 +130,14 @@ schema("/audit") ->
desc => ?DESC(filter_lte_duration_ms)
})},
{gte_created_at,
?HOCON(emqx_utils_calendar:epoch_millisecond(), #{
?HOCON(emqx_utils_calendar:epoch_microsecond(), #{
in => query,
required => false,
example => <<"2023-10-15T00:00:00.820384+08:00">>,
desc => ?DESC(filter_gte_created_at)
})},
{lte_created_at,
?HOCON(emqx_utils_calendar:epoch_millisecond(), #{
?HOCON(emqx_utils_calendar:epoch_microsecond(), #{
in => query,
example => <<"2023-10-16T00:00:00.820384+08:00">>,
required => false,
@ -170,7 +170,7 @@ fields(audit) ->
[
{created_at,
?HOCON(
emqx_utils_calendar:epoch_millisecond(),
emqx_utils_calendar:epoch_microsecond(),
#{
desc => "The time when the log is created"
}

View File

@ -140,9 +140,9 @@ t_disabled(_) ->
t_cli(_Config) ->
Size = mnesia:table_info(emqx_audit, size),
TimeInt = erlang:system_time(millisecond) - 10,
TimeInt = erlang:system_time(microsecond) - 1000,
Time = integer_to_list(TimeInt),
DateStr = calendar:system_time_to_rfc3339(TimeInt, [{unit, millisecond}]),
DateStr = calendar:system_time_to_rfc3339(TimeInt, [{unit, microsecond}]),
Date = emqx_http_lib:uri_encode(DateStr),
ok = emqx_ctl:run_command(["conf", "show", "log"]),
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
@ -164,7 +164,11 @@ t_cli(_Config) ->
],
Data
),
%% check create at is valid
[#{<<"created_at">> := CreateAtRaw}] = Data,
CreateAt = calendar:rfc3339_to_system_time(binary_to_list(CreateAtRaw), [{unit, microsecond}]),
?assert(CreateAt > TimeInt, CreateAtRaw),
?assert(CreateAt < TimeInt + 5000000, CreateAtRaw),
%% check cli filter
{ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader),
#{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]),
@ -174,25 +178,41 @@ t_cli(_Config) ->
),
?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
%% check created_at filter
%% check created_at filter microsecond
{ok, Res3} = emqx_mgmt_api_test_util:request_api(
get, AuditPath, "gte_created_at=" ++ Time, AuthHeader
),
#{<<"data">> := Data3} = emqx_utils_json:decode(Res3, [return_maps]),
?assertEqual(1, erlang:length(Data3)),
%% check created_at filter rfc3339
{ok, Res31} = emqx_mgmt_api_test_util:request_api(
get, AuditPath, "gte_created_at=" ++ Date, AuthHeader
),
?assertEqual(Res3, Res31),
%% check created_at filter millisecond
TimeMs = integer_to_list(TimeInt div 1000),
{ok, Res32} = emqx_mgmt_api_test_util:request_api(
get, AuditPath, "gte_created_at=" ++ TimeMs, AuthHeader
),
?assertEqual(Res3, Res32),
%% check created_at filter microsecond
{ok, Res4} = emqx_mgmt_api_test_util:request_api(
get, AuditPath, "lte_created_at=" ++ Time, AuthHeader
),
#{<<"data">> := Data4} = emqx_utils_json:decode(Res4, [return_maps]),
?assertEqual(Size, erlang:length(Data4)),
%% check created_at filter rfc3339
{ok, Res41} = emqx_mgmt_api_test_util:request_api(
get, AuditPath, "lte_created_at=" ++ Date, AuthHeader
),
?assertEqual(Res4, Res41),
%% check created_at filter millisecond
{ok, Res42} = emqx_mgmt_api_test_util:request_api(
get, AuditPath, "lte_created_at=" ++ TimeMs, AuthHeader
),
?assertEqual(Res4, Res42),
%% check duration_ms filter
{ok, Res5} = emqx_mgmt_api_test_util:request_api(
@ -224,7 +244,7 @@ t_max_size(_Config) ->
fun(_) ->
ok = emqx_ctl:run_command(["conf", "show", "log"])
end,
lists:duplicate(110, 1)
lists:duplicate(100, 1)
),
_ = mnesia:dump_log(),
LogCount = wait_for_dirty_write_log_done(1500),

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_http, [
{description, "EMQX External HTTP API Authentication and Authorization"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{mod, {emqx_auth_http_app, []}},
{applications, [

View File

@ -113,7 +113,7 @@ headers(desc) ->
?DESC(?FUNCTION_NAME);
headers(converter) ->
fun(Headers) ->
maps:to_list(maps:merge(default_headers(), transform_header_name(Headers)))
maps:to_list(transform_header_name(Headers))
end;
headers(default) ->
default_headers();
@ -129,7 +129,7 @@ headers_no_content_type(converter) ->
maps:to_list(
maps:without(
[<<"content-type">>],
maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
transform_header_name(Headers)
)
)
end;

View File

@ -620,8 +620,8 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
?INTERNAL_ERROR(Reason)
end.
lookup_from_local_node(BridgeType, BridgeName) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of
lookup_from_local_node(ActionType, ActionName) ->
case emqx_bridge:lookup(ActionType, ActionName) of
{ok, Res} -> {ok, format_resource(Res, node())};
Error -> Error
end.
@ -895,25 +895,19 @@ aggregate_metrics(
format_resource(
#{
type := Type,
type := ActionType,
name := BridgeName,
raw_config := RawConf,
resource_data := ResourceData
},
Node
) ->
RawConfFull =
case emqx_bridge_v2:is_bridge_v2_type(Type) of
true ->
%% The defaults are already filled in
RawConf;
false ->
fill_defaults(Type, RawConf)
end,
BridgeV1Type = downgrade_type(ActionType, emqx_bridge_lib:get_conf(ActionType, BridgeName)),
RawConfFull = fill_defaults(BridgeV1Type, RawConf),
redact(
maps:merge(
RawConfFull#{
type => downgrade_type(Type, emqx_bridge_lib:get_conf(Type, BridgeName)),
type => BridgeV1Type,
name => maps:get(<<"name">>, RawConf, BridgeName),
node => Node
},

View File

@ -235,11 +235,10 @@ mongodb_structs() ->
kafka_structs() ->
[
{kafka_producer,
{kafka,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer)),
#{
aliases => [kafka],
desc => <<"Kafka Producer Bridge Config">>,
required => false,
converter => fun kafka_producer_converter/2

View File

@ -31,12 +31,13 @@
-export([
common_bridge_fields/0,
metrics_fields/0,
status_fields/0,
metrics_fields/0
type_and_name_fields/1
]).
%% for testing only
-export([enterprise_api_schemas/1]).
-export([enterprise_api_schemas/1, enterprise_fields_bridges/0]).
%%======================================================================================
%% Hocon Schema Definitions
@ -156,6 +157,12 @@ metrics_fields() ->
)}
].
type_and_name_fields(ConnectorType) ->
[
{type, mk(ConnectorType, #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].
%%======================================================================================
%% For config files
@ -168,11 +175,10 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
fields(bridges) ->
[
{http,
{webhook,
mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{
aliases => [webhook],
desc => ?DESC("bridges_webhook"),
required => false,
converter => fun http_bridge_converter/2
@ -191,7 +197,7 @@ fields(bridges) ->
end
}
)}
] ++ enterprise_fields_bridges();
] ++ ?MODULE:enterprise_fields_bridges();
fields("metrics") ->
[
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},

View File

@ -45,6 +45,10 @@
-export([types/0, types_sc/0]).
-export([resource_opts_fields/0, resource_opts_fields/1]).
-export([
api_fields/3
]).
-export([
make_producer_action_schema/1,
make_consumer_action_schema/1,
@ -153,6 +157,24 @@ method_values(get, Type) ->
method_values(put, _Type) ->
#{}.
api_fields("get_bridge_v2", Type, Fields) ->
lists:append(
[
emqx_bridge_schema:type_and_name_fields(Type),
emqx_bridge_schema:status_fields(),
Fields
]
);
api_fields("post_bridge_v2", Type, Fields) ->
lists:append(
[
emqx_bridge_schema:type_and_name_fields(Type),
Fields
]
);
api_fields("put_bridge_v2", _Type, Fields) ->
Fields.
%%======================================================================================
%% HOCON Schema Callbacks
%%======================================================================================

View File

@ -21,7 +21,7 @@ empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)),
?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)),
?assertEqual(#{<<"bridges">> => #{<<"webhook">> => #{}}}, check(Conf2)),
ok.
%% ensure webhook config can be checked
@ -33,7 +33,7 @@ webhook_config_test() ->
?assertMatch(
#{
<<"bridges">> := #{
<<"http">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@ -48,7 +48,7 @@ webhook_config_test() ->
?assertMatch(
#{
<<"bridges">> := #{
<<"http">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@ -61,7 +61,7 @@ webhook_config_test() ->
),
#{
<<"bridges">> := #{
<<"http">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,

View File

@ -106,7 +106,9 @@ setup_mocks() ->
emqx_bridge_v2_schema,
registered_api_schemas,
1,
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end
fun(Method) ->
[{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v2_" ++ Method)}]
end
),
catch meck:new(emqx_bridge_schema, MeckOpts),
@ -114,7 +116,24 @@ setup_mocks() ->
emqx_bridge_schema,
enterprise_api_schemas,
1,
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end
fun(Method) ->
[{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v1_" ++ Method)}]
end
),
meck:expect(
emqx_bridge_schema,
enterprise_fields_bridges,
0,
fun() ->
[
{
bridge_type_bin(),
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, v1_bridge)), #{}
)
}
]
end
),
ok.
@ -156,7 +175,7 @@ fields("connector") ->
{on_start_fun, hoconsc:mk(binary(), #{})},
{ssl, hoconsc:ref(ssl)}
];
fields("api_post") ->
fields("api_v2_post") ->
[
{connector, hoconsc:mk(binary(), #{})},
{name, hoconsc:mk(binary(), #{})},
@ -164,6 +183,20 @@ fields("api_post") ->
{send_to, hoconsc:mk(atom(), #{})}
| fields("connector")
];
fields("api_v1_post") ->
ConnectorFields = proplists:delete(resource_opts, fields("connector")),
[
{connector, hoconsc:mk(binary(), #{})},
{name, hoconsc:mk(binary(), #{})},
{type, hoconsc:mk(bridge_type(), #{})},
{send_to, hoconsc:mk(atom(), #{})},
{resource_opts, hoconsc:mk(hoconsc:ref(?MODULE, v1_resource_opts), #{})}
| ConnectorFields
];
fields(v1_bridge) ->
lists:foldl(fun proplists:delete/2, fields("api_v1_post"), [name, type]);
fields(v1_resource_opts) ->
emqx_resource_schema:create_opts(_Overrides = []);
fields(ssl) ->
emqx_schema:client_ssl_opts_schema(#{required => false}).
@ -333,9 +366,11 @@ get_connector_http(Name) ->
create_bridge_http_api_v1(Opts) ->
Name = maps:get(name, Opts),
Overrides = maps:get(overrides, Opts, #{}),
OverrideFn = maps:get(override_fn, Opts, fun(X) -> X end),
BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
BridgeConfig = maps:without([<<"connector">>], BridgeConfig0),
Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
Params0 = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
Params = OverrideFn(Params0),
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
ct:pal("creating bridge (http v1): ~p", [Params]),
Res = request(post, Path, Params),
@ -919,3 +954,29 @@ t_obfuscated_secrets_probe(_Config) ->
),
ok.
t_v1_api_fill_defaults(_Config) ->
%% Ensure only one sub-field is used, but we get back the defaults filled in.
BridgeName = ?FUNCTION_NAME,
OverrideFn = fun(Params) ->
ResourceOpts = #{<<"resume_interval">> => 100},
maps:put(<<"resource_opts">>, ResourceOpts, Params)
end,
?assertMatch(
{ok,
{{_, 201, _}, _, #{
<<"resource_opts">> :=
#{
<<"resume_interval">> := _,
<<"query_mode">> := _,
<<"inflight_window">> := _,
<<"start_timeout">> := _,
<<"start_after_created">> := _,
<<"max_buffer_bytes">> := _,
<<"batch_size">> := _
}
}}},
create_bridge_http_api_v1(#{name => BridgeName, override_fn => OverrideFn})
),
ok.

View File

@ -968,7 +968,8 @@ t_rule_pointing_to_non_operational_channel(_Config) ->
counters :=
#{
matched := 1,
'actions.failed' := 1
'actions.failed' := 1,
'actions.failed.unknown' := 1
}
},
emqx_metrics_worker:get_metrics(rule_metrics, RuleId)

View File

@ -216,12 +216,8 @@ create_bridge_api(Config, Overrides) ->
BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
ConnectorName = ?config(connector_name, Config),
ConnectorType = ?config(connector_type, Config),
ConnectorConfig = ?config(connector_config, Config),
{ok, _Connector} =
emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig),
{ok, {{_, 201, _}, _, _}} = create_connector_api(Config),
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
Path = emqx_mgmt_api_test_util:api_path(["actions"]),

View File

@ -27,10 +27,14 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ParamsKeys = producer_action_parameters_field_keys(),
Config1 = maps:with(CommonActionKeys, BridgeV1Config),
Params = maps:with(ParamsKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
Config1#{
<<"connector">> => ConnectorName,
<<"parameters">> => Params
}.
}
).
%%------------------------------------------------------------------------------------------
%% Internal helper fns

View File

@ -59,9 +59,18 @@ fields(producer_action) ->
)
);
fields(action_parameters) ->
UnsupportedFields = [local_topic],
lists:filter(
fun({Key, _Schema}) -> not lists:member(Key, UnsupportedFields) end,
lists:map(
fun
({local_topic, Sc}) ->
Override = #{
%% to please dialyzer...
type => hocon_schema:field_schema(Sc, type),
importance => ?IMPORTANCE_HIDDEN
},
{local_topic, hocon_schema:override(Sc, Override)};
(Field) ->
Field
end,
emqx_bridge_gcp_pubsub:fields(producer)
);
%%=========================================

View File

@ -22,7 +22,7 @@ kafka_producer_test() ->
#{
<<"bridges">> :=
#{
<<"kafka_producer">> :=
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"kafka">> := #{}}
@ -35,7 +35,7 @@ kafka_producer_test() ->
#{
<<"bridges">> :=
#{
<<"kafka_producer">> :=
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"local_topic">> := _}
@ -48,7 +48,7 @@ kafka_producer_test() ->
#{
<<"bridges">> :=
#{
<<"kafka_producer">> :=
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
@ -64,7 +64,7 @@ kafka_producer_test() ->
#{
<<"bridges">> :=
#{
<<"kafka_producer">> :=
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
@ -166,7 +166,7 @@ message_key_dispatch_validations_test() ->
?assertThrow(
{_, [
#{
path := "bridges.kafka_producer.myproducer.kafka",
path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
@ -175,7 +175,7 @@ message_key_dispatch_validations_test() ->
?assertThrow(
{_, [
#{
path := "bridges.kafka_producer.myproducer.kafka",
path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},

View File

@ -26,6 +26,7 @@
]).
-define(CONNECTOR_TYPE, mongodb).
-define(ACTION_TYPE, mongodb).
%%=================================================================================================
%% hocon_schema API
@ -107,21 +108,22 @@ fields(Field) when
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields("connection_fields"));
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++
fields("post_bridge_v2");
fields("post_bridge_v2") ->
type_and_name_fields(mongodb) ++
fields(mongodb_action);
fields("put_bridge_v2") ->
fields(mongodb_action);
Fields =
fields("connection_fields") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(mongodb_action));
fields("post_rs") ->
fields(mongodb_rs) ++ type_and_name_fields(mongodb_rs);
fields(mongodb_rs) ++ emqx_bridge_schema:type_and_name_fields(mongodb_rs);
fields("post_sharded") ->
fields(mongodb_sharded) ++ type_and_name_fields(mongodb_sharded);
fields(mongodb_sharded) ++ emqx_bridge_schema:type_and_name_fields(mongodb_sharded);
fields("post_single") ->
fields(mongodb_single) ++ type_and_name_fields(mongodb_single);
fields(mongodb_single) ++ emqx_bridge_schema:type_and_name_fields(mongodb_single);
fields("put_rs") ->
fields(mongodb_rs);
fields("put_sharded") ->
@ -131,22 +133,24 @@ fields("put_single") ->
fields("get_rs") ->
emqx_bridge_schema:status_fields() ++
fields(mongodb_rs) ++
type_and_name_fields(mongodb_rs);
emqx_bridge_schema:type_and_name_fields(mongodb_rs);
fields("get_sharded") ->
emqx_bridge_schema:status_fields() ++
fields(mongodb_sharded) ++
type_and_name_fields(mongodb_sharded);
emqx_bridge_schema:type_and_name_fields(mongodb_sharded);
fields("get_single") ->
emqx_bridge_schema:status_fields() ++
fields(mongodb_single) ++
type_and_name_fields(mongodb_single).
emqx_bridge_schema:type_and_name_fields(mongodb_single).
bridge_v2_examples(Method) ->
[
#{
<<"mongodb">> => #{
summary => <<"MongoDB Action">>,
value => action_values(Method)
value => emqx_bridge_v2_schema:action_values(
Method, mongodb, mongodb, #{parameters => #{collection => <<"mycol">>}}
)
}
}
].
@ -178,19 +182,25 @@ connector_examples(Method) ->
#{
<<"mongodb_rs">> => #{
summary => <<"MongoDB Replica Set Connector">>,
value => connector_values(mongodb_rs, Method)
value => emqx_connector_schema:connector_values(
Method, mongodb_rs, #{parameters => connector_values()}
)
}
},
#{
<<"mongodb_sharded">> => #{
summary => <<"MongoDB Sharded Connector">>,
value => connector_values(mongodb_sharded, Method)
value => emqx_connector_schema:connector_values(
Method, mongodb_sharded, #{parameters => connector_values()}
)
}
},
#{
<<"mongodb_single">> => #{
summary => <<"MongoDB Standalone Connector">>,
value => connector_values(mongodb_single, Method)
value => emqx_connector_schema:connector_values(
Method, mongodb_single, #{parameters => connector_values()}
)
}
}
].
@ -224,40 +234,6 @@ desc(_) ->
%% Internal fns
%%=================================================================================================
type_and_name_fields(MongoType) ->
[
{type, mk(MongoType, #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].
connector_values(Type, Method) ->
lists:foldl(
fun(M1, M2) ->
maps:merge(M1, M2)
end,
#{
description => <<"My example connector">>,
parameters => mongo_type_opts(Type)
},
[
common_values(),
method_values(mongodb, Method)
]
).
action_values(Method) ->
maps:merge(
method_values(mongodb, Method),
#{
description => <<"My example action">>,
enable => true,
connector => <<"my_mongodb_connector">>,
parameters => #{
collection => <<"mycol">>
}
}
).
values(MongoType, Method) ->
maps:merge(
mongo_type_opts(MongoType),
@ -295,10 +271,10 @@ bridge_values(Type, _Method) ->
type => TypeBin,
collection => <<"mycol">>
},
common_values()
connector_values()
).
common_values() ->
connector_values() ->
#{
enable => true,
database => <<"mqtt">>,
@ -307,26 +283,3 @@ common_values() ->
username => <<"myuser">>,
password => <<"******">>
}.
method_values(Type, post) ->
TypeBin = atom_to_binary(Type),
#{
name => <<TypeBin/binary, "_demo">>,
type => TypeBin
};
method_values(Type, get) ->
maps:merge(
method_values(Type, post),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
],
actions => [<<"my_action">>]
}
);
method_values(_Type, put) ->
#{}.

View File

@ -30,7 +30,11 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
ActionConfig#{<<"connector">> => ConnectorName}.
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ActionTopLevelKeys = schema_keys(mongodb_action),
@ -42,10 +46,7 @@ bridge_v1_config_to_connector_config(BridgeV1Config) ->
ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun(ResourceOpts) ->
CommonROSubfields = emqx_connector_schema:common_resource_opts_subfields_bin(),
maps:with(CommonROSubfields, ResourceOpts)
end,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnConfig0
).

View File

@ -58,10 +58,10 @@ init_per_group(Type = rs, Config) ->
MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")),
case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
true ->
ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(),
Apps = start_apps(Config),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[
{apps, Apps},
{mongo_host, MongoHost},
{mongo_port, MongoPort},
{mongo_config, MongoConfig},
@ -77,10 +77,10 @@ init_per_group(Type = sharded, Config) ->
MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")),
case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
true ->
ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(),
Apps = start_apps(Config),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[
{apps, Apps},
{mongo_host, MongoHost},
{mongo_port, MongoPort},
{mongo_config, MongoConfig},
@ -96,8 +96,7 @@ init_per_group(Type = single, Config) ->
MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
true ->
ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(),
Apps = start_apps(Config),
%% NOTE: `mongo-single` has auth enabled, see `credentials.env`.
AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")),
Username = bin(os:getenv("MONGO_USERNAME", "")),
@ -113,6 +112,7 @@ init_per_group(Type = single, Config) ->
],
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, NConfig),
[
{apps, Apps},
{mongo_host, MongoHost},
{mongo_port, MongoPort},
{mongo_config, MongoConfig},
@ -124,6 +124,14 @@ init_per_group(Type = single, Config) ->
{skip, no_mongo}
end.
end_per_group(Type, Config) when
Type =:= rs;
Type =:= sharded;
Type =:= single
->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok;
end_per_group(_Type, _Config) ->
ok.
@ -131,18 +139,6 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps(
[
emqx_management,
emqx_bridge_mongodb,
emqx_mongodb,
emqx_bridge,
emqx_connector,
emqx_rule_engine,
emqx_conf
]
),
ok.
init_per_testcase(_Testcase, Config) ->
@ -162,23 +158,22 @@ end_per_testcase(_Testcase, Config) ->
%% Helper fns
%%------------------------------------------------------------------------------
start_apps() ->
ensure_loaded(),
%% some configs in emqx_conf app are mandatory,
%% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps(
start_apps(Config) ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_rule_engine,
emqx_connector,
emqx_bridge,
emqx_mongodb,
emqx_bridge_mongodb,
emqx_management
]
).
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
Apps.
ensure_loaded() ->
_ = application:load(emqtt),
@ -221,6 +216,15 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
"\n resource_opts = {"
"\n query_mode = ~s"
"\n worker_pool_size = 1"
"\n health_check_interval = 15s"
"\n start_timeout = 5s"
"\n start_after_created = true"
"\n request_ttl = 45s"
"\n inflight_window = 100"
"\n max_buffer_bytes = 256MB"
"\n buffer_mode = memory_only"
"\n metrics_flush_interval = 5s"
"\n resume_interval = 15s"
"\n }"
"\n }",
[
@ -248,6 +252,15 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
"\n resource_opts = {"
"\n query_mode = ~s"
"\n worker_pool_size = 1"
"\n health_check_interval = 15s"
"\n start_timeout = 5s"
"\n start_after_created = true"
"\n request_ttl = 45s"
"\n inflight_window = 100"
"\n max_buffer_bytes = 256MB"
"\n buffer_mode = memory_only"
"\n metrics_flush_interval = 5s"
"\n resume_interval = 15s"
"\n }"
"\n }",
[
@ -278,6 +291,15 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
"\n resource_opts = {"
"\n query_mode = ~s"
"\n worker_pool_size = 1"
"\n health_check_interval = 15s"
"\n start_timeout = 5s"
"\n start_after_created = true"
"\n request_ttl = 45s"
"\n inflight_window = 100"
"\n max_buffer_bytes = 256MB"
"\n buffer_mode = memory_only"
"\n metrics_flush_interval = 5s"
"\n resume_interval = 15s"
"\n }"
"\n }",
[

View File

@ -144,7 +144,12 @@ connector_config(Name, Config) ->
<<"srv_record">> => false,
<<"username">> => Username,
<<"password">> => iolist_to_binary(["file://", PassFile]),
<<"auth_source">> => AuthSource
<<"auth_source">> => AuthSource,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_connector_config(InnerConfigMap, Name).
@ -166,8 +171,21 @@ bridge_config(Name, ConnectorId) ->
<<"connector">> => ConnectorId,
<<"parameters">> =>
#{},
<<"local_topic">> => <<"t/aeh">>
%%,
<<"local_topic">> => <<"t/mongo">>,
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"resume_interval">> => <<"15s">>,
<<"worker_pool_size">> => <<"1">>
}
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_bridge_config(InnerConfigMap, Name).

View File

@ -160,12 +160,12 @@ fields("put") ->
fields("config");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post");
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
fields("post_bridge_v2") ->
[type_field(), name_field() | fields(mysql_action)];
fields("put_bridge_v2") ->
fields(mysql_action);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(mysql_action));
fields(Field) when
Field == "get_connector";
Field == "put_connector";

View File

@ -44,15 +44,17 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
ActionConfig#{<<"connector">> => ConnectorName}.
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ConnectorKeys = schema_keys("config_connector"),
ResourceOptsKeys = schema_keys(connector_resource_opts),
maps:update_with(
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun(ResourceOpts) -> maps:with(ResourceOptsKeys, ResourceOpts) end,
#{},
fun emqx_connector_schema:project_to_connector_resource_opts/1,
maps:with(ConnectorKeys, BridgeV1Config)
).

View File

@ -35,13 +35,18 @@ on_add_channel(
) ->
ChannelConfig1 = emqx_utils_maps:unindent(parameters, ChannelConfig0),
QueryTemplates = emqx_mysql:parse_prepare_sql(ChannelId, ChannelConfig1),
case validate_sql_type(ChannelId, ChannelConfig1, QueryTemplates) of
ok ->
ChannelConfig2 = maps:merge(ChannelConfig1, QueryTemplates),
ChannelConfig = set_prepares(ChannelConfig2, ConnectorState),
State = State0#{
channels => maps:put(ChannelId, ChannelConfig, Channels),
connector_state => ConnectorState
},
{ok, State}.
{ok, State};
{error, Error} ->
{error, Error}
end.
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
case maps:get(ChannelId, Channels) of
@ -116,11 +121,13 @@ on_batch_query(InstanceId, BatchRequest, _State = #{connector_state := Connector
on_remove_channel(
_InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId
) ->
) when is_map_key(ChannelId, Channels) ->
ChannelConfig = maps:get(ChannelId, Channels),
emqx_mysql:unprepare_sql(maps:merge(ChannelConfig, ConnectorState)),
NewState = State#{channels => maps:remove(ChannelId, Channels)},
{ok, NewState}.
{ok, NewState};
on_remove_channel(_InstanceId, State, _ChannelId) ->
{ok, State}.
-spec on_start(binary(), hocon:config()) ->
{ok, #{connector_state := emqx_mysql:state(), channels := map()}} | {error, _}.
@ -148,3 +155,43 @@ set_prepares(ChannelConfig, ConnectorState) ->
#{prepares := Prepares} =
emqx_mysql:init_prepare(maps:merge(ConnectorState, ChannelConfig)),
ChannelConfig#{prepares => Prepares}.
validate_sql_type(ChannelId, ChannelConfig, #{query_templates := QueryTemplates}) ->
Batch =
case emqx_utils_maps:deep_get([resource_opts, batch_size], ChannelConfig) of
N when N > 1 -> batch;
_ -> single
end,
BatchKey = {ChannelId, batch},
SingleKey = {ChannelId, prepstmt},
case {QueryTemplates, Batch} of
{#{BatchKey := _}, batch} ->
ok;
{#{SingleKey := _}, single} ->
ok;
{_, batch} ->
%% try to provide helpful info
SQL = maps:get(sql, ChannelConfig),
Type = emqx_utils_sql:get_statement_type(SQL),
ErrorContext0 = #{
reason => failed_to_prepare_statement,
statement_type => Type,
operation_type => Batch
},
ErrorContext = emqx_utils_maps:put_if(
ErrorContext0,
hint,
<<"UPDATE statements are not supported for batch operations">>,
Type =:= update
),
{error, ErrorContext};
_ ->
SQL = maps:get(sql, ChannelConfig),
Type = emqx_utils_sql:get_statement_type(SQL),
ErrorContext = #{
reason => failed_to_prepare_statement,
statement_type => Type,
operation_type => Batch
},
{error, ErrorContext}
end.

View File

@ -31,6 +31,8 @@
-define(WORKER_POOL_SIZE, 4).
-define(ACTION_TYPE, mysql).
-import(emqx_common_test_helpers, [on_exit/1]).
%%------------------------------------------------------------------------------
@ -45,7 +47,14 @@ all() ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
NonBatchCases = [
t_write_timeout,
t_uninitialized_prepared_statement,
t_non_batch_update_is_allowed
],
OnlyBatchCases = [
t_batch_update_is_forbidden
],
BatchingGroups = [
{group, with_batch},
{group, without_batch}
@ -57,7 +66,7 @@ groups() ->
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs -- NonBatchCases},
{without_batch, TCs}
{without_batch, TCs -- OnlyBatchCases}
].
init_per_group(tcp, Config) ->
@ -103,6 +112,8 @@ end_per_group(_Group, _Config) ->
ok.
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config.
end_per_suite(_Config) ->
@ -151,6 +162,9 @@ common_init(Config0) ->
{mysql_config, MysqlConfig},
{mysql_bridge_type, BridgeType},
{mysql_name, Name},
{bridge_type, BridgeType},
{bridge_name, Name},
{bridge_config, MysqlConfig},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort}
| Config0
@ -182,6 +196,15 @@ mysql_config(BridgeType, Config) ->
" batch_size = ~b\n"
" query_mode = ~s\n"
" worker_pool_size = ~b\n"
" health_check_interval = 15s\n"
" start_timeout = 5s\n"
" inflight_window = 100\n"
" max_buffer_bytes = 256MB\n"
" buffer_mode = memory_only\n"
" batch_time = 0\n"
" metrics_flush_interval = 5s\n"
" buffer_seg_bytes = 10MB\n"
" start_after_created = true\n"
" }\n"
" ssl = {\n"
" enable = ~w\n"
@ -865,3 +888,91 @@ t_nested_payload_template(Config) ->
connect_and_get_payload(Config)
),
ok.
t_batch_update_is_forbidden(Config) ->
?check_trace(
begin
Overrides = #{
<<"sql">> =>
<<
"UPDATE mqtt_test "
"SET arrived = FROM_UNIXTIME(${timestamp}/1000) "
"WHERE payload = ${payload.value}"
>>
},
ProbeRes = emqx_bridge_testlib:probe_bridge_api(Config, Overrides),
?assertMatch({error, {{_, 400, _}, _, _Body}}, ProbeRes),
{error, {{_, 400, _}, _, ProbeBodyRaw}} = ProbeRes,
?assertEqual(
match,
re:run(
ProbeBodyRaw,
<<"UPDATE statements are not supported for batch operations">>,
[global, {capture, none}]
)
),
CreateRes = emqx_bridge_testlib:create_bridge_api(Config, Overrides),
?assertMatch(
{ok, {{_, 201, _}, _, #{<<"status">> := <<"disconnected">>}}},
CreateRes
),
{ok, {{_, 201, _}, _, #{<<"status_reason">> := Reason}}} = CreateRes,
?assertEqual(
match,
re:run(
Reason,
<<"UPDATE statements are not supported for batch operations">>,
[global, {capture, none}]
)
),
ok
end,
[]
),
ok.
t_non_batch_update_is_allowed(Config) ->
?check_trace(
begin
BridgeName = ?config(bridge_name, Config),
Overrides = #{
<<"resource_opts">> => #{<<"metrics_flush_interval">> => <<"500ms">>},
<<"sql">> =>
<<
"UPDATE mqtt_test "
"SET arrived = FROM_UNIXTIME(${timestamp}/1000) "
"WHERE payload = ${payload.value}"
>>
},
ProbeRes = emqx_bridge_testlib:probe_bridge_api(Config, Overrides),
?assertMatch({ok, {{_, 204, _}, _, _Body}}, ProbeRes),
?assertMatch(
{ok, {{_, 201, _}, _, #{<<"status">> := <<"connected">>}}},
emqx_bridge_testlib:create_bridge_api(Config, Overrides)
),
{ok, #{
<<"id">> := RuleId,
<<"from">> := [Topic]
}} = create_rule_and_action_http(Config),
Payload = emqx_utils_json:encode(#{value => <<"aaaa">>}),
Message = emqx_message:make(Topic, Payload),
{_, {ok, _}} =
?wait_async_action(
emqx:publish(Message),
#{?snk_kind := mysql_connector_query_return},
10_000
),
ActionId = emqx_bridge_v2:id(?ACTION_TYPE, BridgeName),
?assertEqual(1, emqx_resource_metrics:matched_get(ActionId)),
?retry(
_Sleep0 = 200,
_Attempts0 = 10,
?assertEqual(1, emqx_resource_metrics:success_get(ActionId))
),
?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')),
ok
end,
[]
),
ok.

View File

@ -100,11 +100,18 @@ init_per_group(timescale, Config0) ->
init_per_group(_Group, Config) ->
Config.
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
connect_and_drop_table(Config),
end_per_group(Group, Config) when
Group =:= with_batch;
Group =:= without_batch;
Group =:= matrix;
Group =:= timescale
->
Apps = ?config(apps, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
connect_and_drop_table(Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = emqx_cth_suite:stop(Apps),
ok;
end_per_group(_Group, _Config) ->
ok.
@ -113,8 +120,6 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
ok.
init_per_testcase(_Testcase, Config) ->
@ -147,14 +152,31 @@ common_init(Config0) ->
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
% Ensure enterprise bridge module is loaded
ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge,
emqx_bridge_pgsql,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config0)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
%% ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
%% _ = emqx_bridge_enterprise:module_info(),
%% emqx_mgmt_api_test_util:init_suite(),
% Connect to pgsql directly and create the table
connect_and_create_table(Config0),
{Name, PGConf} = pgsql_config(BridgeType, Config0),
Config =
[
{apps, Apps},
{pgsql_config, PGConf},
{pgsql_bridge_type, BridgeType},
{pgsql_name, Name},
@ -198,6 +220,16 @@ pgsql_config(BridgeType, Config) ->
"\n request_ttl = 500ms"
"\n batch_size = ~b"
"\n query_mode = ~s"
"\n worker_pool_size = 1"
"\n health_check_interval = 15s"
"\n start_after_created = true"
"\n start_timeout = 5s"
"\n inflight_window = 100"
"\n max_buffer_bytes = 256MB"
"\n buffer_seg_bytes = 10MB"
"\n buffer_mode = memory_only"
"\n metrics_flush_interval = 5s"
"\n resume_interval = 15s"
"\n }"
"\n ssl = {"
"\n enable = ~w"
@ -218,6 +250,9 @@ pgsql_config(BridgeType, Config) ->
),
{Name, parse_and_check(ConfigString, BridgeType, Name)}.
default_sql() ->
?SQL_BRIDGE.
create_passfile(BridgeType, Config) ->
Filename = binary_to_list(BridgeType) ++ ".passfile",
Filepath = filename:join(?config(priv_dir, Config), Filename),
@ -689,14 +724,13 @@ t_missing_table(Config) ->
t_table_removed(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
%%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
_Sleep = 100,
_Attempts = 200,
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
),
connect_and_drop_table(Config),

View File

@ -0,0 +1,233 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_pgsql_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BRIDGE_TYPE, pgsql).
-define(BRIDGE_TYPE_BIN, <<"pgsql">>).
-define(CONNECTOR_TYPE, pgsql).
-define(CONNECTOR_TYPE_BIN, <<"pgsql">>).
-import(emqx_common_test_helpers, [on_exit/1]).
-import(emqx_utils_conv, [bin/1]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of
true ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge,
emqx_bridge_pgsql,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, Api} = emqx_common_test_http:create_default_app(),
NConfig = [
{apps, Apps},
{api, Api},
{pgsql_host, PostgresHost},
{pgsql_port, PostgresPort},
{enable_tls, false},
{postgres_host, PostgresHost},
{postgres_port, PostgresPort}
| Config
],
emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig),
NConfig;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_postgres);
_ ->
{skip, no_postgres}
end
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
common_init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_config:delete_override_conf_files(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
Username = <<"root">>,
Password = <<"public">>,
Passfile = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(Passfile, Password),
NConfig = [
{postgres_username, Username},
{postgres_password, Password},
{postgres_passfile, Passfile}
| Config
],
ConnectorConfig = connector_config(Name, NConfig),
BridgeConfig = bridge_config(Name, Name),
ok = snabbkaffe:start_trace(),
[
{connector_type, ?CONNECTOR_TYPE},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name},
{bridge_config, BridgeConfig}
| NConfig
].
end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
connector_config(Name, Config) ->
PostgresHost = ?config(postgres_host, Config),
PostgresPort = ?config(postgres_port, Config),
Username = ?config(postgres_username, Config),
PassFile = ?config(postgres_passfile, Config),
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"database">> => <<"mqtt">>,
<<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]),
<<"pool_size">> => 8,
<<"username">> => Username,
<<"password">> => iolist_to_binary(["file://", PassFile]),
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_connector_config(InnerConfigMap, Name).
parse_and_check_connector_config(InnerConfigMap, Name) ->
TypeBin = ?CONNECTOR_TYPE_BIN,
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
#{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
required => false, atom_key => false
}),
ct:pal("parsed config: ~p", [Config]),
InnerConfigMap.
bridge_config(Name, ConnectorId) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> =>
#{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()},
<<"local_topic">> => <<"t/postgres">>,
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"resume_interval">> => <<"15s">>,
<<"worker_pool_size">> => <<"1">>
}
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_bridge_config(InnerConfigMap, Name).
%% check it serializes correctly
serde_roundtrip(InnerConfigMap0) ->
IOList = hocon_pp:do(InnerConfigMap0, #{}),
{ok, InnerConfigMap} = hocon:binary(IOList),
InnerConfigMap.
parse_and_check_bridge_config(InnerConfigMap, Name) ->
TypeBin = ?BRIDGE_TYPE_BIN,
RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
InnerConfigMap.
make_message() ->
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
#{
clientid => ClientId,
payload => Payload,
timestamp => 1668602148000
}.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped),
ok.
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok.
t_sync_query(Config) ->
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertMatch({ok, _}, Res) end,
postgres_bridge_connector_on_query_return
),
ok.

View File

@ -122,7 +122,9 @@ fields("get_cluster") ->
method_fields(get, redis_cluster);
%% old bridge v1 schema
fields(Type) when
Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster
Type == redis_single;
Type == redis_sentinel;
Type == redis_cluster
->
redis_bridge_common_fields(Type) ++
connector_fields(Type);

View File

@ -29,14 +29,12 @@ connector_type_name() -> redis.
schema_module() -> ?SCHEMA_MODULE.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
fix_v1_type(
maps:merge(
maps:without(
[<<"connector">>],
map_unindent(<<"parameters">>, ActionConfig)
),
map_unindent(<<"parameters">>, ConnectorConfig)
)
).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
@ -44,12 +42,11 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig0 = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
ActionConfig = emqx_utils_maps:update_if_present(
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig0
),
ActionConfig#{<<"connector">> => ConnectorName}.
ActionConfig0#{<<"connector">> => ConnectorName}
).
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
@ -78,9 +75,6 @@ bridge_v1_type_name() ->
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
v1_type(Type).
fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) ->
Conf#{<<"type">> => v1_type(RedisType)}.
v1_type(<<"single">>) -> redis_single;
v1_type(<<"sentinel">>) -> redis_sentinel;
v1_type(<<"cluster">>) -> redis_cluster.

View File

@ -82,9 +82,13 @@ on_start(InstId, Config) ->
end.
on_stop(InstId, #{conn_st := RedisConnSt}) ->
emqx_redis:on_stop(InstId, RedisConnSt);
Res = emqx_redis:on_stop(InstId, RedisConnSt),
?tp(redis_bridge_stopped, #{instance_id => InstId}),
Res;
on_stop(InstId, undefined = _State) ->
emqx_redis:on_stop(InstId, undefined).
Res = emqx_redis:on_stop(InstId, undefined),
?tp(redis_bridge_stopped, #{instance_id => InstId}),
Res.
on_get_status(InstId, #{conn_st := RedisConnSt}) ->
emqx_redis:on_get_status(InstId, RedisConnSt).
@ -98,7 +102,7 @@ on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) ->
Result = query(InstId, {cmd, Cmd}, RedisConnSt),
?tp(
redis_bridge_connector_send_done,
#{cmd => Cmd, batch => false, mode => sync, result => Result}
#{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
),
Result;
on_query(
@ -115,7 +119,7 @@ on_query(
Result = query(InstId, {cmd, Cmd}, RedisConnSt),
?tp(
redis_bridge_connector_send_done,
#{cmd => Cmd, batch => false, mode => sync, result => Result}
#{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
),
Result;
Error ->
@ -135,6 +139,7 @@ on_batch_query(
?tp(
redis_bridge_connector_send_done,
#{
instance_id => InstId,
batch_data => BatchData,
batch_size => length(BatchData),
batch => true,

View File

@ -50,7 +50,6 @@ fields("config_connector") ->
#{required => true, desc => ?DESC(redis_parameters)}
)}
] ++
emqx_redis:redis_fields() ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) ++
emqx_connector_schema_lib:ssl_fields();
fields(connector_resource_opts) ->

View File

@ -59,7 +59,11 @@ all() -> [{group, transports}, {group, rest}].
suite() -> [{timetrap, {minutes, 20}}].
groups() ->
ResourceSpecificTCs = [t_create_delete_bridge],
ResourceSpecificTCs = [
t_create_delete_bridge,
t_create_via_http,
t_start_stop
],
TCs = emqx_common_test_helpers:all(?MODULE) -- ResourceSpecificTCs,
TypeGroups = [
{group, redis_single},
@ -130,10 +134,13 @@ wait_for_ci_redis(Checks, Config) ->
emqx_resource,
emqx_connector,
emqx_bridge,
emqx_rule_engine
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
[
{apps, Apps},
{proxy_host, ProxyHost},
@ -177,9 +184,8 @@ init_per_testcase(Testcase, Config0) ->
IsBatch = (BatchMode =:= batch_on),
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"),
[
{bridge_type, BridgeType},
{bridge_type, RedisType},
{bridge_config, BridgeConfig1},
{is_batch, IsBatch}
| Config
@ -425,6 +431,14 @@ t_create_disconnected(Config) ->
),
ok = emqx_bridge:remove(Type, Name).
t_create_via_http(Config) ->
ok = emqx_bridge_testlib:t_create_via_http(Config),
ok.
t_start_stop(Config) ->
ok = emqx_bridge_testlib:t_start_stop(Config, redis_bridge_stopped),
ok.
%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
@ -599,7 +613,14 @@ toxiproxy_redis_bridge_config() ->
<<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"health_check_interval">> => <<"1s">>,
<<"start_timeout">> => <<"15s">>
<<"max_buffer_bytes">> => <<"256MB">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"request_ttl">> => <<"45s">>,
<<"inflight_window">> => <<"100">>,
<<"resume_interval">> => <<"1s">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
maps:merge(Conf0, ?COMMON_REDIS_OPTS).
@ -611,7 +632,14 @@ username_password_redis_bridge_config() ->
<<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"health_check_interval">> => <<"1s">>,
<<"start_timeout">> => <<"15s">>
<<"max_buffer_bytes">> => <<"256MB">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"request_ttl">> => <<"45s">>,
<<"inflight_window">> => <<"100">>,
<<"resume_interval">> => <<"15s">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS),

View File

@ -0,0 +1,339 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_redis_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BRIDGE_TYPE, redis).
-define(BRIDGE_TYPE_BIN, <<"redis">>).
-define(CONNECTOR_TYPE, redis).
-define(CONNECTOR_TYPE_BIN, <<"redis">>).
-import(emqx_common_test_helpers, [on_exit/1]).
-import(emqx_utils_conv, [bin/1]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
All0 = emqx_common_test_helpers:all(?MODULE),
All = All0 -- matrix_testcases(),
Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
Groups ++ All.
groups() ->
emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_testcases()).
matrix_testcases() ->
[
t_start_stop,
t_create_via_http,
t_on_get_status,
t_sync_query
].
init_per_suite(Config) ->
TestHosts = [
{"redis", 6379},
{"redis-tls", 6380},
{"redis-sentinel", 26379},
{"redis-sentinel-tls", 26380},
{"redis-cluster-1", 6379},
{"redis-cluster-2", 6379},
{"redis-cluster-3", 6379},
{"redis-cluster-tls-1", 6389},
{"redis-cluster-tls-2", 6389},
{"redis-cluster-tls-3", 6389}
],
case emqx_common_test_helpers:is_all_tcp_servers_available(TestHosts) of
true ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_redis,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, Api} = emqx_common_test_http:create_default_app(),
NConfig = [
{apps, Apps},
{api, Api},
{enable_tls, false}
| Config
],
NConfig;
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_redis);
_ ->
{skip, no_redis}
end
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_group(Group, Config) when
Group =:= single;
Group =:= sentinel;
Group =:= cluster
->
[{redis_type, Group} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
common_init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_config:delete_override_conf_files(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
Username = <<"test_user">>,
Password = <<"test_passwd">>,
Passfile = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(Passfile, Password),
NConfig = [
{redis_username, Username},
{redis_password, Password},
{redis_passfile, Passfile}
| Config
],
Path = group_path(Config),
ct:comment(Path),
ConnectorConfig = connector_config(Name, Path, NConfig),
BridgeConfig = action_config(Name, Path, Name),
ok = snabbkaffe:start_trace(),
[
{connector_type, ?CONNECTOR_TYPE},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name},
{bridge_config, BridgeConfig}
| NConfig
].
end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
connector_config(Name, Path, Config) ->
[RedisType, _Transport | _] = Path,
Username = ?config(redis_username, Config),
PassFile = ?config(redis_passfile, Config),
CommonCfg = #{
<<"enable">> => true,
<<"description">> => <<"redis connector">>,
<<"parameters">> => #{
<<"password">> => iolist_to_binary(["file://", PassFile]),
<<"pool_size">> => 8,
<<"username">> => Username
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
PerTypeCfg = per_type_connector_config(RedisType),
InnerConfigMap0 = emqx_utils_maps:deep_merge(CommonCfg, PerTypeCfg),
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_connector_config(InnerConfigMap, Name).
per_type_connector_config(single) ->
#{
<<"parameters">> =>
#{
<<"database">> => <<"0">>,
<<"server">> => <<"redis:6379">>,
<<"redis_type">> => <<"single">>
}
};
per_type_connector_config(sentinel) ->
#{
<<"parameters">> =>
#{
<<"database">> => <<"0">>,
<<"servers">> => <<"redis-sentinel:26379">>,
<<"sentinel">> => <<"mytcpmaster">>,
<<"redis_type">> => <<"sentinel">>
}
};
per_type_connector_config(cluster) ->
#{
<<"parameters">> =>
#{
<<"servers">> =>
<<"redis-cluster-1:6379,redis-cluster-2:6379,redis-cluster-3:6379">>,
<<"redis_type">> => <<"cluster">>
}
}.
parse_and_check_connector_config(InnerConfigMap, Name) ->
TypeBin = ?CONNECTOR_TYPE_BIN,
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
#{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
required => false, atom_key => false
}),
ct:pal("parsed config: ~p", [Config]),
InnerConfigMap.
action_config(Name, Path, ConnectorId) ->
[RedisType, _Transport | _] = Path,
CommonCfg =
#{
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> =>
#{<<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>]},
<<"local_topic">> => <<"t/redis">>,
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"resume_interval">> => <<"15s">>,
<<"worker_pool_size">> => <<"1">>
}
},
PerTypeCfg = per_type_action_config(RedisType),
InnerConfigMap0 = emqx_utils_maps:deep_merge(CommonCfg, PerTypeCfg),
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_bridge_config(InnerConfigMap, Name).
per_type_action_config(single) ->
#{<<"redis_type">> => <<"single">>};
per_type_action_config(sentinel) ->
#{<<"redis_type">> => <<"sentinel">>};
per_type_action_config(cluster) ->
#{<<"redis_type">> => <<"cluster">>}.
%% check it serializes correctly
serde_roundtrip(InnerConfigMap0) ->
IOList = hocon_pp:do(InnerConfigMap0, #{}),
{ok, InnerConfigMap} = hocon:binary(IOList),
InnerConfigMap.
parse_and_check_bridge_config(InnerConfigMap, Name) ->
TypeBin = ?BRIDGE_TYPE_BIN,
RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
InnerConfigMap.
make_message() ->
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
#{
clientid => ClientId,
payload => Payload,
timestamp => 1668602148000
}.
%% return the path (reverse of the stack) of the test groups.
%% root group is discarded.
group_path(Config) ->
case emqx_common_test_helpers:group_path(Config) of
[] ->
undefined;
Path ->
tl(Path)
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(matrix) ->
{start_stop, [
[single, tcp],
[sentinel, tcp],
[cluster, tcp]
]};
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped),
ok.
t_create_via_http(matrix) ->
{create_via_http, [
[single, tcp],
[sentinel, tcp],
[cluster, tcp]
]};
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(matrix) ->
{on_get_status, [
[single, tcp],
[sentinel, tcp],
[cluster, tcp]
]};
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok.
t_sync_query(matrix) ->
{sync_query, [
[single, tcp],
[sentinel, tcp],
[cluster, tcp]
]};
t_sync_query(Config) ->
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertMatch({ok, _}, Res) end,
redis_bridge_connector_send_done
),
ok.

View File

@ -143,6 +143,24 @@ readable("epoch_millisecond()") ->
]
}
};
readable("epoch_microsecond()") ->
%% only for swagger
#{
swagger => #{
<<"oneOf">> => [
#{
type => integer,
example => 1640995200000000,
description => <<"epoch-microsecond">>
},
#{
type => string,
example => <<"2022-01-01T00:00:00.000000Z">>,
format => <<"date-time">>
}
]
}
};
readable("duration()") ->
#{
swagger => #{type => string, example => <<"12m">>},

View File

@ -290,7 +290,11 @@ transform_bridge_v1_config_to_action_config(
TopMap = maps:with(TopKeys, ActionMap1),
RestMap = maps:without(TopKeys, ActionMap1),
%% Other parameters should be stuffed into `parameters'
emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}).
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap})
).
generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
ConnectorNameList =

View File

@ -19,6 +19,7 @@
-compile(export_all).
-import(emqx_mgmt_api_test_util, [uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -830,9 +831,9 @@ t_list_disabled_channels(Config) ->
)
),
ActionName = ?BRIDGE_NAME,
ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := true},
ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := false},
?assertMatch(
{ok, 201, #{<<"enable">> := true}},
{ok, 201, #{<<"enable">> := false}},
request_json(
post,
uri(["actions"]),
@ -841,6 +842,23 @@ t_list_disabled_channels(Config) ->
)
),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName),
?assertMatch(
{ok, 200, #{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Not installed">>,
<<"error">> := <<"Not installed">>
}},
request_json(
get,
uri(["actions", ActionID]),
Config
)
),
%% This should be fast even if the connector resource process is unresponsive.
ConnectorResID = emqx_connector_resource:resource_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
suspend_connector_resource(ConnectorResID, Config),
try
?assertMatch(
{ok, 200, #{<<"actions">> := [ActionName]}},
request_json(
@ -849,6 +867,10 @@ t_list_disabled_channels(Config) ->
Config
)
),
ok
after
resume_connector_resource(ConnectorResID, Config)
end,
ok.
t_raw_config_response_defaults(Config) ->
@ -987,3 +1009,30 @@ json(B) when is_binary(B) ->
ct:pal("Failed to decode json: ~p~n~p", [Reason, B]),
Error
end.
suspend_connector_resource(ConnectorResID, Config) ->
Node = ?config(node, Config),
Pid = erpc:call(Node, fun() ->
[Pid] = [
Pid
|| {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup),
ID =:= ConnectorResID
],
sys:suspend(Pid),
Pid
end),
on_exit(fun() -> erpc:call(Node, fun() -> catch sys:resume(Pid) end) end),
ok.
resume_connector_resource(ConnectorResID, Config) ->
Node = ?config(node, Config),
erpc:call(Node, fun() ->
[Pid] = [
Pid
|| {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup),
ID =:= ConnectorResID
],
sys:resume(Pid),
ok
end),
ok.

View File

@ -786,7 +786,7 @@ examples_gateway_confs() ->
ocpp_gateway =>
#{
summary => <<"A simple OCPP gateway config">>,
vaule =>
value =>
#{
enable => true,
name => <<"ocpp">>,
@ -926,7 +926,7 @@ examples_update_gateway_confs() ->
ocpp_gateway =>
#{
summary => <<"A simple OCPP gateway config">>,
vaule =>
value =>
#{
enable => true,
enable_stats => true,

View File

@ -321,9 +321,48 @@ stop_listener(GwName, {Type, LisName, ListenOn, Cfg}) ->
end,
StopRet.
stop_listener(GwName, Type, LisName, ListenOn, _Cfg) ->
stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when
Type == tcp;
Type == ssl;
Type == udp;
Type == dtls
->
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
esockd:close(Name, ListenOn).
esockd:close(Name, ListenOn);
stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when
Type == ws; Type == wss
->
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
case cowboy:stop_listener(Name) of
ok ->
wait_listener_stopped(ListenOn);
Error ->
Error
end.
wait_listener_stopped(ListenOn) ->
% NOTE
% `cowboy:stop_listener/1` will not close the listening socket explicitly,
% it will be closed by the runtime system **only after** the process exits.
Endpoint = maps:from_list(ip_port(ListenOn)),
case
gen_tcp:connect(
maps:get(ip, Endpoint, loopback),
maps:get(port, Endpoint),
[{active, false}]
)
of
{error, _EConnrefused} ->
%% NOTE
%% We should get `econnrefused` here because acceptors are already dead
%% but don't want to crash if not, because this doesn't make any difference.
ok;
{ok, Socket} ->
%% NOTE
%% Tiny chance to get a connected socket here, when some other process
%% concurrently binds to the same port.
gen_tcp:close(Socket)
end.
-ifndef(TEST).
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).

View File

@ -416,7 +416,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}
log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel),
{ok, Channel};
handle_info(Info, Channel) ->
log(error, #{msg => "unexpected_info}", info => Info}, Channel),
log(error, #{msg => "unexpected_info", info => Info}, Channel),
{ok, Channel}.
%%--------------------------------------------------------------------

View File

@ -1,8 +1,6 @@
# emqx-jt808
# JT/T 808 2013 网关数据交换格式
JT/T 808 2013 协议接入网关
该文档定义了 Plugins **emqx_jt808****EMQX** 之间数据交换的格式
该文档定义了 **emqx_jt808****EMQX** 之间数据交换的格式
约定:
- Payload 采用 Json 格式进行组装
@ -75,7 +73,8 @@ Json 结构示例
### 消息体字段对照表
- 终端通用应答 `"msg_id": 1` 0x0001
#### 终端通用应答 `"msg_id": 1` 0x0001
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
@ -83,7 +82,8 @@ Json 结构示例
| 结果 | result | byte | integer |
- 平台通用应答 `"msg_id": 32769` 0x8001
#### 平台通用应答 `"msg_id": 32769` 0x8001
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
@ -91,11 +91,13 @@ Json 结构示例
| 结果 | result | byte | integer |
- 终端心跳 `"msg_id": 2` 0x0002
#### 终端心跳 `"msg_id": 2` 0x0002
空 Json
- 补传分包请求 `"msg_id": 32771` 0x8003
#### 补传分包请求 `"msg_id": 32771` 0x8003
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:--------------:|:------------------:|
| 原始消息流水号 | seq | word | integer |
@ -103,7 +105,8 @@ Json 结构示例
| 重传包 ID 列表 | ids | byte(2*length) | list of integer |
- 终端注册 `"msg_id": 256` 0x0100
#### 终端注册 `"msg_id": 256` 0x0100
| Field | Json Key name | Value Type | Value Type in Json |
|:---------:|:--------------:|:----------:|:------------------:|
| 省域 ID | province | word | integer |
@ -115,28 +118,34 @@ Json 结构示例
| 车辆标识 | license_number | string | string |
- 终端注册应答 `"msg_id": 33024` 0x8100
#### 终端注册应答 `"msg_id": 33024` 0x8100
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 结果 | result | byte | integer |
只有成功后才有此字段
| Optional Field | Json Key name | Value Type | Value Type in JSON |
|:--------------:|---------------|------------|--------------------|
| 鉴权码 | auth_code | string | string |
- 终端注销 `"msg_id": 3` 0x0003
#### 终端注销 `"msg_id": 3` 0x0003
空 Json
- 终端鉴权 `"msg_id": 258` 0x0102
#### 终端鉴权 `"msg_id": 258` 0x0102
| Field | Json Key name | Value Type | Value Type in Json |
|:------:|:-------------:|:----------:|:------------------:|
| 鉴权码 | code | string | string |
- 设置终端参数 `"msg_id": 33027` 0x8103
#### 设置终端参数 `"msg_id": 33027` 0x8103
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------------------------------------------:|
| 参数总数 | length | byte | integer |
@ -147,11 +156,13 @@ Json 结构示例
参数 ID 说明见协议规定.
- 查询终端参数 `"msg_id": 33028` 0x8104
#### 查询终端参数 `"msg_id": 33028` 0x8104
空 Json
- 查询指定终端参数 `"msg_id": 33030` 0x8106
#### 查询指定终端参数 `"msg_id": 33030` 0x8106
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:--------------:|:--------------------------------:|
| 参数总数 | length | byte | integer |
@ -160,7 +171,8 @@ Json 结构示例
参数 ID 列表中元素为 integer
- 查询终端应答参数 `"msg_id": 260` 0x0104
#### 查询终端应答参数 `"msg_id": 260` 0x0104
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------------------------------------------:|
| 应答流水号 | seq | word | integer |
@ -172,18 +184,21 @@ Json 结构示例
参数 ID 说明见协议规定.
- 终端控制 `"msg_id": 33029 ` 0x8105
#### 终端控制 `"msg_id": 33029 ` 0x8105
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 命令字 | command | byte | integer |
| 命令参数 | param | string | string |
- 查询终端属性 `"msg_id": 33031` 0x8107
#### 查询终端属性 `"msg_id": 33031` 0x8107
空 Json
- 查询终端属性应答 `"msg_id": 263` 0x0107
#### 查询终端属性应答 `"msg_id": 263` 0x0107
| Field | Json Key name | Value Type | Value Type in Json |
|:-----------------:|:----------------:|:----------:|:------------------:|
| 终端类型 | type | word | integer |
@ -196,10 +211,11 @@ Json 结构示例
| GNSS 模块属性 | gnss_prop | byte | integer |
| 通信模块属性 | comm_prop | byte | integer |
-- 终端硬件版本号长度、终端固件版本号长度,将被用于二进制报文解析,不向上暴露
- 终端硬件版本号长度、终端固件版本号长度,将被用于二进制报文解析,不向上暴露
- 下发终端升级包 `"msg_id": 33032` 0x8108
#### 下发终端升级包 `"msg_id": 33032` 0x8108
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:----------------------:|
| 升级类型 | type | byte | integer |
@ -210,14 +226,16 @@ Json 结构示例
| 升级数据包 | firmware | binary | string(base64 encoded) |
- 终端升级结果通知 `"msg_id": 264` 0x0108
#### 终端升级结果通知 `"msg_id": 264` 0x0108
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 升级类型 | type | byte | integer |
| 升级结果 | result | byte | integer |
- 位置信息汇报 `"msg_id": 512` 0x0200
#### 位置信息汇报 `"msg_id": 512` 0x0200
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------------:|:-------------:|:----------:|:------------------:|
| 报警标志 | alarm | dword | integer |
@ -230,11 +248,13 @@ Json 结构示例
| 时间 | time | bcd(6) | string |
| Optional Field | Json Key name | Value Type | Value Type in JSON |
|:------------------:|:-------------:|:----------:|:------------------:|
| 位置附加信息项列表 | extra | - | map |
%% TODO: refine alarm mroe details
<!-- TODO: refine alarm mroe details -->
- 位置附加信息项列表, 在 `extra`
位置附加信息项列表, 在 `extra`
| Field (附加信息描述) | Json Key name | Value Type | Value Type in Json |
|:---------------------------------:|:---------------:|:----------:|:----------------------:|
| 里程 | mileage | dword | integer |
@ -250,44 +270,51 @@ Json 结构示例
| 无线通信网络信号强度 | rssi | byte | integer |
| GNSS 定位卫星数 | gnss_sat_num | byte | integer |
| 后续自定义信息长度 | custome | - | string(base64 encoded) |
| %% TODO 自定义区域 | | | |
| ## TODO 自定义区域 | | | |
- 超速报警附加信息(长度1或5), 置于 map `overspeed_alarm`
超速报警附加信息(长度1或5), 置于 map `overspeed_alarm`
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 位置类型 | type | byte | integer |
| Optional Field | Json Key name | Value Type | Value Type in JSON |
|:--------------:|:-------------:|:----------:|:------------------:|
| 区域或路段 ID | id | dword | integer |
进出区域/路线报警附加信息, 置于 map `in_out_alarm`
- 进出区域/路线报警附加信息, 置于 map `in_out_alarm`
| Field | Json Key name | Value Type | Value Type in Json |
|:-------------:|:-------------:|:----------:|:------------------:|
| 位置类型 | type | byte | integer |
| 区域或路段 ID | id | dword | integer |
| 方向 | direction | byte | integer |
路段行驶时间不足/过长报警附加信息, 置于 map `path_time_alarm`
- 路段行驶时间不足/过长报警附加信息, 置于 map `path_time_alarm`
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 路段 ID | id | dword | integer |
| 路段行驶时间 | time | word | integer |
| 结果 | result | byte | integer |
IO 状态位, 置于 map `io_status`
- IO 状态位, 置于 map `io_status`
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 深度休眠状态 | deep_sleep | 1 bit | integer |
| 休眠状态 | sleep | 1 bit | integer |
模拟量, 置于 map `analog`
- 模拟量, 置于 map `analog`
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 模拟量 0 | ad0 | 16 bits | integer |
| 模拟量 1 | ad1 | 16 bits | integer |
扩展车辆信号状态位, 置于 map `extra`
- 扩展车辆信号状态位, 置于 map `extra`
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:---------------:|:----------:|:------------------------------------------:|
| 信号 | signal | - 2 bits | map, `{"low_beam": VAL, "high_beam": VAL}` |
@ -305,7 +332,8 @@ IO 状态位, 置于 map `io_status` 内
| 加热器工作 | heater | 1 bit | integer |
| 离合器状态 | cluth | 1 bit | integer |
信号状态, 置于 map `signal`
- 信号状态, 置于 map `signal`
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 近光灯信号 | low_beam | 1 bit | integer |
@ -313,7 +341,7 @@ IO 状态位, 置于 map `io_status` 内
example:
```
```json
{
"header" : {
"msg_id" : 1,
@ -381,39 +409,45 @@ example:
```
- 位置信息查询 `"msg_id": 33281` 0x8201
#### 位置信息查询 `"msg_id": 33281` 0x8201
空 Json
- 位置信息查询应答 `"msg_id": 513` 0x0201
#### 位置信息查询应答 `"msg_id": 513` 0x0201
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 位置信息汇报 | params | - | map |
- 临时位置跟踪控制 `"msg_id": 33282` 0x8202
#### 临时位置跟踪控制 `"msg_id": 33282` 0x8202
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 时间间隔 | period | word | integer |
| 跟踪位置有效期 | expiry | dword | integer |
- 人工确认报警消息 `"msg_id": 33283` 0x8203
#### 人工确认报警消息 `"msg_id": 33283` 0x8203
| Field | Json Key name | Value Type | Value Type in Json |
|:----------------:|:-------------:|:----------:|:------------------:|
| 报警消息流水号 | seq | word | integer |
| 人工确认报警类型 | type | dword | integer |
- 文本信息下发 `"msg_id": 33536` 0x8300
#### 文本信息下发 `"msg_id": 33536` 0x8300
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 标志 | flag | byte | integer |
| 文本信息 | text | string | string |
- 事件设置 `"msg_id": 33537` 0x8301
#### 事件设置 `"msg_id": 33537` 0x8301
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:-----------------------------------------------------------------:|
| 设置类型 | type | byte | integer |
@ -424,15 +458,17 @@ example:
| 事件内容 | content | string | string |
- 事件报告 `"msg_id": 769` 0x0301
#### 事件报告 `"msg_id": 769` 0x0301
| Field | Json Key name | Value Type | Value Type in Json |
|:-------:|:-------------:|------------|:------------------:|
| 事件 ID | id | byte | integer |
- 提问下发 `"msg_id": 33538` 0x8302
#### 提问下发 `"msg_id": 33538` 0x8302
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:---------------------------------------------------------------:|
|:------------:|:-------------:|:----------:|:--------------------------------------------------------------:|
| 标志 | flag | byte | integer |
| 问题内容长度 | length | byte | integer |
| 问题 | question | string | string |
@ -441,16 +477,18 @@ example:
| 答案内容长度 | len | byte | integer |
| 答案内容 | answer | string | string |
%% TODO: len -> length or other length -> len
<!-- TODO: len -> length or other length -> len -->
#### 提问应答 `"msg_id": 770` 0x0302
- 提问应答 `"msg_id": 770` 0x0302
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 答案 ID | id | byte | integer |
- 信息点播菜单设置 `"msg_id": 33539` 0x8303
#### 信息点播菜单设置 `"msg_id": 33539` 0x8303
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 设置类型 | type | byte | integer |
@ -461,14 +499,16 @@ example:
| 信息名称 | info | string | string |
- 信息点播/取消 `"msg_id": 771` 0x0303
#### 信息点播/取消 `"msg_id": 771` 0x0303
| Field | Json Key name | Value Type | Value Type in Json |
|:-------------:|:-------------:|:----------:|:------------------:|
| 信息类型 | id | byte | integer |
| 点拨/取消标志 | flag | byte | integer |
- 信息服务 `"msg_id": 33540` 0x8304
#### 信息服务 `"msg_id": 33540` 0x8304
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 信息类型 | type | byte | integer |
@ -476,14 +516,16 @@ example:
| 信息内容 | info | string | string |
- 电话回拨 `"msg_id": 33792` 0x8400
#### 电话回拨 `"msg_id": 33792` 0x8400
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 标志 | type | byte | integer |
| 电话号码 | phone | string | string |
- `"msg_id": 33793` 0x8401
#### 设置电话本 `"msg_id": 33793` 0x8401
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 设置类型 | type | byte | integer |
@ -496,23 +538,28 @@ example:
| 联系人 | name | string | string |
联系人项示例
`[{"type": TYPE, "phone_len", PH_LEN, "phone": PHONE, "name_len": NAME_LEN, "name": NAME}, ...]`
```json
[{"type": TYPE, "phone_len", PH_LEN, "phone": PHONE, "name_len": NAME_LEN, "name": NAME}, ...]
```
- `"msg_id": 34048` 0x8500
#### 车辆控制 `"msg_id": 34048` 0x8500
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 标志控制 | flag | byte | integer |
- `"msg_id": 1280` 0x0500
#### 车辆控制应答 `"msg_id": 1280` 0x0500
| Field | Json Key name | Value Type | Value Type in Json |
|:------------------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 位置信息汇报消息体 | location | map | map of location |
- `"msg_id": 34304` 0x8600
#### 设置圆形区域 `"msg_id": 34304` 0x8600
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:------------------:|:----------:|:------------------:|
| 设置属性 | type | byte | integer |
@ -529,7 +576,8 @@ example:
| 超速持续时间 | overspeed_duration | byte | integer |
区域列表示例
`[{"id": ID,
```json
[{"id": ID,
"flag": FLAG,
"center_latitude": CEN_LAT,
"center_longitude": CEN_LON,
@ -540,10 +588,12 @@ example:
"overspeed_duration", OVERSPEED_DURATION
},
...
]`
]
```
- 删除圆形区域 `"msg_id": 34305` 0x8601
#### 删除圆形区域 `"msg_id": 34305` 0x8601
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 区域数 | length | byte | integer |
@ -553,7 +603,8 @@ example:
`[ID1, ID2, ...]`
- 设置矩形区域 `"msg_id": 34306` 0x8602
#### 设置矩形区域 `"msg_id": 34306` 0x8602
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:------------------:|:----------:|:------------------------:|
| 设置属性 | type | byte | integer |
@ -571,7 +622,8 @@ example:
| 超速持续时间 | overspeed_duration | byte | integer |
- 删除矩形区域 `"msg_id": 34307` 0x8603
#### 删除矩形区域 `"msg_id": 34307` 0x8603
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 区域数 | length | byte | integer |
@ -579,7 +631,8 @@ example:
| 区域 ID 1~n | - | dword | integer |
- 设置多边形区域 `"msg_id": 34308` 0x8604
#### 设置多边形区域 `"msg_id": 34308` 0x8604
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:------------------:|:----------:|:------------------:|
| 区域 ID | id | dword | integer |
@ -594,7 +647,8 @@ example:
| 顶点经度 | lng | dword | integer |
- 删除多边形区域 `"msg_id": 34309` 0x8605
#### 删除多边形区域 `"msg_id": 34309` 0x8605
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 区域数 | length | byte | integer |
@ -602,7 +656,8 @@ example:
| 区域 ID 1~n | - | dword | integer |
- 设置路线 `"msg_id": 34310` 0x8606
#### 设置路线 `"msg_id": 34310` 0x8606
| Field | Json Key name | Value Type | Value Type in Json |
|:----------------:|:------------------:|:----------:|:------------------:|
| 路线 ID | id | dword | integer |
@ -623,7 +678,8 @@ example:
| 路段超速持续时间 | overspeed_duration | byte | integer |
- `"msg_id": 34311` 0x8607
#### 删除路线 `"msg_id": 34311` 0x8607
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 路线数 | length | byte | integer |
@ -631,14 +687,16 @@ example:
| 路线 ID | - | dword | integer |
- 行驶记录数据采集命令 `"msg_id": 34560` 0x8700
#### 行驶记录数据采集命令 `"msg_id": 34560` 0x8700
| Field | Json Key name | Value Type | Value Type in Json |
|:------:|:-------------:|:----------------------:|:------------------:|
| 命令字 | command | byte | integer |
| 数据块 | param | string(base64 encoded) | string |
- 行驶记录数据上传 `"msg_id": 1792` 0x0700
#### 行驶记录数据上传 `"msg_id": 1792` 0x0700
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------------------:|:------------------:|
| 应答流水号 | seq | word | integer |
@ -646,25 +704,29 @@ example:
| 数据块 | data | string(base64 encoded) | string |
- 行驶记录参数下传命令 `"msg_id": 34561` 0x8701
#### 行驶记录参数下传命令 `"msg_id": 34561` 0x8701
| Field | Json Key name | Value Type | Value Type in Json |
|:------:|:-------------:|:----------------------:|:------------------:|
| 命令字 | command | byte | integer |
| 数据块 | param | string(base64 encoded) | string |
- 电子运单上报 `"msg_id": 1793` 0x0701
#### 电子运单上报 `"msg_id": 1793` 0x0701
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------------------:|:------------------:|
| 电子运单长度 | length | dword | integer |
| 电子运单内容 | data | string(base64 encoded) | string |
- 上报驾驶员身份信息请求 `"msg_id": 34562` 0x8702
#### 上报驾驶员身份信息请求 `"msg_id": 34562` 0x8702
空 Json
- 驾驶员身份信息采集上报 `"msg_id": 1794` 0x0702
#### 驾驶员身份信息采集上报 `"msg_id": 1794` 0x0702
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 状态 | status | byte | integer |
@ -676,7 +738,8 @@ example:
| 证件有效期 | cert_expiry | string | string |
- 定位数据批量上传 `"msg_id": 1796` 0x0704
#### 定位数据批量上传 `"msg_id": 1796` 0x0704
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 位置数据类型 | type | byte | integer |
@ -684,7 +747,8 @@ example:
| 位置汇报数据项 | location | list | list of location |
- `"msg_id": 1797` 0x0705
#### CAN 总线数据上传 `"msg_id": 1797` 0x0705
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------------:|:-------------:|:----------:|:----------------------:|
| 数据项个数 | length | word | integer |
@ -697,7 +761,8 @@ example:
| CAN 数据 | data | binary | string(base64 encoded) |
- 多媒体时间信息上传 `"msg_id": 2048` 0x0800
#### 多媒体时间信息上传 `"msg_id": 2048` 0x0800
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 多媒体数据 ID | id | dword | integer |
@ -707,7 +772,8 @@ example:
| 通道 ID | channel | byte | integer |
- 多媒体数据上传 `"msg_id": 2049` 0x0801
#### 多媒体数据上传 `"msg_id": 2049` 0x0801
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:----------------------:|
| 多媒体 ID | id | dword | integer |
@ -720,7 +786,8 @@ example:
- 多媒体数据上传应答 `"msg_id": 34816` 0x8800
#### 多媒体数据上传应答 `"msg_id": 34816` 0x8800
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 多媒体 ID | mm_id | dword | integer |
@ -728,7 +795,8 @@ example:
| 重传包 ID 列表 | retx_ids | list | list of retry IDs |
- 摄像头立即拍摄命令 `"msg_id": 34817` 0x8801
#### 摄像头立即拍摄命令 `"msg_id": 34817` 0x8801
| Field | Json Key name | Value Type | Value Type in Json |
|:-----------------:|:-------------:|:----------:|:------------------:|
| 通道 ID | channel_id | byte | integer |
@ -743,7 +811,8 @@ example:
| 色度 | chromaticity | byte | integer |
- 摄像头立即拍摄应答 `"msg_id": 2053` 0x0805
#### 摄像头立即拍摄应答 `"msg_id": 2053` 0x0805
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:--------------:|:------------------:|
| 应答流水号 | seq | word | integer |
@ -752,7 +821,8 @@ example:
| 多媒体 ID 列表 | ids | byte(4*length) | integer |
- 存储多媒体数据检索 `"msg_id": 34818` 0x8802
#### 存储多媒体数据检索 `"msg_id": 34818` 0x8802
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 多媒体类型 | | byte | |
@ -762,7 +832,8 @@ example:
| 结束时间 | | string | |
- 存储多媒体数据检索应答 `"msg_id": 2050` 0x0802
#### 存储多媒体数据检索应答 `"msg_id": 2050` 0x0802
| Field | Json Key name | Value Type | Value Type in Json |
|:----------------:|:-------------:|:----------:|:---------------------:|
| 应答流水号 | seq | word | integer |
@ -775,7 +846,8 @@ example:
| 位置信息汇报 | location | byte(28) | map |
- 存储多媒体数据上传命令 `"msg_id": 34819` 0x8803
#### 存储多媒体数据上传命令 `"msg_id": 34819` 0x8803
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 多媒体类型 | type | byte | integer |
@ -786,7 +858,8 @@ example:
| 删除标志 | delete | byte | integer |
- 录音开始命令 `"msg_id": 34820` 0x8804
#### 录音开始命令 `"msg_id": 34820` 0x8804
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 录音命令 | command | byte | integer |
@ -795,46 +868,54 @@ example:
| 音频采样率 | rate | byte | integer |
- 单条存储多媒体j叔叔检索上传命令 `"msg_id": 34821` 0x8805
#### 单条存储多媒体j叔叔检索上传命令 `"msg_id": 34821` 0x8805
| Field | Json Key name | Value Type | Value Type in Json |
|:---------:|:-------------:|:----------:|:------------------:|
| 多媒体 ID | id | dword | integer |
| 删除标志 | flag | byte | integer |
- 数据下行透传 `"msg_id": 35072` 0x8900
#### 数据下行透传 `"msg_id": 35072` 0x8900
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:----------------------:|
| 透传消息类型 | type | byte | integer |
| 透传消息内容 | data | binary | string(base64 encoded) |
- 数据上行透传 `"msg_id": 2304` 0x0900
#### 数据上行透传 `"msg_id": 2304` 0x0900
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:----------------------:|
| 透传消息类型 | type | byte | integer |
| 透传消息内容 | data | binary | string(base64 encoded) |
- 数据压缩上报 `"msg_id": 2305` 0x0901
#### 数据压缩上报 `"msg_id": 2305` 0x0901
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:----------------------:|
| 压缩消息长度 | length | dword | integer |
| 压缩消息体 | data | binary | string(base64 encoded) |
- 平台 RSA 公钥 `"msg_id": 35328` 0x8A00
#### 平台 RSA 公钥 `"msg_id": 35328` 0x8A00
| Field | Json Key name | Value Type | Value Type in Json |
|:-----:|:-------------:|:----------:|:----------------------:|
| e | e | dword | integer |
| n | n | byte(128) | string(base64 encoded) |
- 终端 RSA 公钥 `"msg_id": 2560` 0x0A00
#### 终端 RSA 公钥 `"msg_id": 2560` 0x0A00
| Field | Json Key name | Value Type | Value Type in Json |
|:-----:|:-------------:|:----------:|:----------------------:|
| e | e | dword | integer |
| n | n | byte(128) | string(base64 encoded) |
- 0x8F00 ~ 0x8FFF
- 0x0F00 ~ 0x0FFF
#### 保留 0x8F00 ~ 0x8FFF
#### 保留 0x0F00 ~ 0x0FFF

View File

@ -16,7 +16,7 @@
init(#{allow_anonymous := true}) ->
#auth{registry = undefined, authentication = undefined, allow_anonymous = true};
init(#{registry := Reg, authentication := Auth, allow_anonymous := Anonymous}) ->
init(#{allow_anonymous := Anonymous = false, registry := Reg, authentication := Auth}) ->
#auth{registry = Reg, authentication = Auth, allow_anonymous = Anonymous}.
register(_RegFrame, #auth{registry = undefined, allow_anonymous = true}) ->

View File

@ -48,7 +48,7 @@
%% AuthCode
authcode :: undefined | anonymous | binary(),
%% Keepalive
keepalive,
keepalive :: maybe(emqx_keepalive:keepalive()),
%% Msg SN
msg_sn,
%% Down Topic
@ -85,6 +85,8 @@
-define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]).
-define(DN_TOPIC_SUBOPTS, #{rap => 0, nl => 0, qos => 0, rh => 0}).
-define(RETX_INTERVAL, 8000).
-define(RETX_MAX_TIME, 5).
@ -115,15 +117,28 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, _) ->
#{};
info(session, #channel{session = Session}) ->
Session;
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(authcode, #channel{authcode = AuthCode}) ->
AuthCode.
stats(_Channel) ->
[].
-spec stats(channel()) -> emqx_types:stats().
stats(#channel{inflight = Inflight, mqueue = Queue}) ->
%% XXX: A fake stats for managed by emqx_management
[
{subscriptions_cnt, 1},
{subscriptions_max, 1},
{inflight_cnt, emqx_inflight:size(Inflight)},
{inflight_max, emqx_inflight:max_size(Inflight)},
{mqueue_len, queue:len(Queue)},
{mqueue_max, 0},
{mqueue_dropped, 0},
{next_pkt_id, 0},
{awaiting_rel_cnt, 0},
{awaiting_rel_max, 0}
].
%%--------------------------------------------------------------------
%% Init the Channel
@ -138,7 +153,7 @@ init(
Options = #{
ctx := Ctx,
message_queue_len := MessageQueueLen,
proto := ProtoConf
proto := #{auth := Auth} = ProtoConf
}
) ->
% TODO: init rsa_key from user input
@ -173,12 +188,12 @@ init(
conn_state = idle,
timers = #{},
authcode = undefined,
keepalive = maps:get(keepalive, Options, ?DEFAULT_KEEPALIVE),
keepalive = undefined,
msg_sn = 0,
% TODO: init rsa_key from user input
dn_topic = maps:get(dn_topic, ProtoConf, ?DEFAULT_DN_TOPIC),
up_topic = maps:get(up_topic, ProtoConf, ?DEFAULT_UP_TOPIC),
auth = emqx_jt808_auth:init(ProtoConf),
auth = emqx_jt808_auth:init(Auth),
inflight = emqx_inflight:new(128),
mqueue = queue:new(),
max_mqueue_len = MessageQueueLen,
@ -228,9 +243,8 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of
{ok, Authcode} ->
Channel = enrich_clientinfo(
Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode})
),
{ok, Conninfo} = enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}),
{ok, Channel} = enrich_clientinfo(Frame, Conninfo),
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel);
{error, Reason} ->
?SLOG(error, #{msg => "register_failed", reason => Reason}),
@ -243,25 +257,26 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
end;
do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
Channel =
#channel{clientinfo = #{clientid := ClientId}} =
enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)),
authack(
case authenticate(Frame, Channel0) of
case
emqx_utils:pipeline(
[
fun enrich_clientinfo/2,
fun enrich_conninfo/2,
fun set_log_meta/2
],
Frame,
Channel0
)
of
{ok, _NFrame, Channel} ->
case authenticate(Frame, Channel) of
true ->
NChannel = prepare_adapter_topic(ensure_connected(Channel)),
emqx_logger:set_metadata_clientid(ClientId),
%% Auto subscribe downlink topics
autosubcribe(NChannel),
_ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
%% 0: Successful
{0, MsgSn, NChannel};
NChannel = process_connect(Frame, ensure_connected(Channel)),
authack({0, MsgSn, NChannel});
false ->
?SLOG(error, #{msg => "authenticated_failed"}),
%% 1: Failure
{1, MsgSn, Channel}
authack({1, MsgSn, Channel})
end
);
end;
do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) ->
handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel);
do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) ->
@ -428,6 +443,8 @@ handle_call(kick, _From, Channel) ->
disconnect_and_shutdown(kicked, ok, Channel1);
handle_call(discard, _From, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
handle_call(subscriptions, _From, Channel = #channel{dn_topic = DnTopic}) ->
reply({ok, [{DnTopic, ?DN_TOPIC_SUBOPTS}]}, Channel);
handle_call(Req, _From, Channel) ->
log(error, #{msg => "unexpected_call", call => Req}, Channel),
reply(ignored, Channel).
@ -464,6 +481,9 @@ handle_info(
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel),
{ok, Channel};
handle_info({keepalive, start, Interval}, Channel) ->
NChannel = Channel#channel{keepalive = emqx_keepalive:init(Interval)},
{ok, ensure_timer(alive_timer, NChannel)};
handle_info(Info, Channel) ->
log(error, #{msg => "unexpected_info", info => Info}, Channel),
{ok, Channel}.
@ -615,6 +635,46 @@ maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
ClientInfo#{mountpoint := Mountpoint1}.
process_connect(
_Frame,
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo = #{clientid := ClientId}
}
) ->
SessFun = fun(_, _) -> #{} end,
case
emqx_gateway_ctx:open_session(
Ctx,
true,
ClientInfo,
ConnInfo,
SessFun
)
of
{ok, #{session := Session}} ->
NChannel = Channel#channel{session = Session},
%% Auto subscribe downlink topics
ok = autosubcribe(NChannel),
_ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]),
_ = emqx_gateway_ctx:insert_channel_info(
Ctx, ClientId, info(NChannel), stats(NChannel)
),
NChannel;
{error, Reason} ->
log(
error,
#{
msg => "failed_to_open_session",
reason => Reason
},
Channel
),
shutdown(Reason, Channel)
end.
ensure_connected(
Channel = #channel{
ctx = Ctx,
@ -624,10 +684,7 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
prepare_adapter_topic(Channel#channel{conninfo = NConnInfo, conn_state = connected}).
%% Ensure disconnected
ensure_disconnected(
@ -836,7 +893,7 @@ enrich_conninfo(
receive_maximum => 0,
expiry_interval => 0
},
Channel#channel{conninfo = NConnInfo}.
{ok, Channel#channel{conninfo = NConnInfo}}.
%% Register
enrich_clientinfo(
@ -855,7 +912,7 @@ enrich_clientinfo(
manufacturer => Manu,
terminal_id => DevId
}),
Channel#channel{clientinfo = NClientInfo};
{ok, Channel#channel{clientinfo = NClientInfo}};
%% Auth
enrich_clientinfo(
#{<<"header">> := #{<<"phone">> := Phone}},
@ -865,7 +922,11 @@ enrich_clientinfo(
phone => Phone,
clientid => Phone
},
Channel#channel{clientinfo = NClientInfo}.
{ok, Channel#channel{clientinfo = NClientInfo}}.
set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId),
ok.
prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) ->
Channel#channel{
@ -905,9 +966,10 @@ autosubcribe(#channel{
#{clientid := ClientId},
dn_topic = Topic
}) ->
SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0},
emqx:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]).
_ = emqx_broker:subscribe(Topic, ClientId, ?DN_TOPIC_SUBOPTS),
ok = emqx_hooks:run('session.subscribed', [
ClientInfo, Topic, ?DN_TOPIC_SUBOPTS#{is_new => true}
]).
start_keepalive(Secs, _Channel) when Secs > 0 ->
self() ! {keepalive, start, round(Secs) * 1000}.

View File

@ -49,36 +49,57 @@ fields(jt808_frame) ->
];
fields(jt808_proto) ->
[
{allow_anonymous, fun allow_anonymous/1},
{registry, fun registry_url/1},
{authentication, fun authentication_url/1},
{auth,
sc(
hoconsc:union([
ref(anonymous_true), ref(anonymous_false)
])
)},
{up_topic, fun up_topic/1},
{dn_topic, fun dn_topic/1}
];
fields(anonymous_true) ->
[
{allow_anonymous,
sc(hoconsc:union([true]), #{desc => ?DESC(allow_anonymous), required => true})}
] ++ fields_reg_auth_required(false);
fields(anonymous_false) ->
[
{allow_anonymous,
sc(hoconsc:union([false]), #{desc => ?DESC(allow_anonymous), required => true})}
] ++ fields_reg_auth_required(true).
fields_reg_auth_required(Required) ->
[
{registry,
sc(binary(), #{
desc => ?DESC(registry_url),
validator => [?NOT_EMPTY("the value of the field 'registry' cannot be empty")],
required => Required
})},
{authentication,
sc(
binary(),
#{
desc => ?DESC(authentication_url),
validator => [
?NOT_EMPTY("the value of the field 'authentication' cannot be empty")
],
required => Required
}
)}
].
jt808_frame_max_length(type) -> non_neg_integer();
jt808_frame_max_length(desc) -> ?DESC(?FUNCTION_NAME);
jt808_frame_max_length(default) -> 8192;
jt808_frame_max_length(required) -> false;
jt808_frame_max_length(_) -> undefined.
allow_anonymous(type) -> boolean();
allow_anonymous(desc) -> ?DESC(?FUNCTION_NAME);
allow_anonymous(default) -> true;
allow_anonymous(required) -> false;
allow_anonymous(_) -> undefined.
registry_url(type) -> binary();
registry_url(desc) -> ?DESC(?FUNCTION_NAME);
registry_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
registry_url(required) -> false;
registry_url(_) -> undefined.
authentication_url(type) -> binary();
authentication_url(desc) -> ?DESC(?FUNCTION_NAME);
authentication_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
authentication_url(required) -> false;
authentication_url(_) -> undefined.
jt808_frame_max_length(type) ->
non_neg_integer();
jt808_frame_max_length(desc) ->
?DESC(?FUNCTION_NAME);
jt808_frame_max_length(default) ->
8192;
jt808_frame_max_length(required) ->
false;
jt808_frame_max_length(_) ->
undefined.
up_topic(type) -> binary();
up_topic(desc) -> ?DESC(?FUNCTION_NAME);

View File

@ -38,43 +38,35 @@
%% <<"jt808/000123456789/000123456789/dn">>
-define(JT808_DN_TOPIC, <<?JT808_MOUNTPOINT, ?JT808_PHONE, "/dn">>).
-define(CONF_DEFAULT, <<
"\n"
"gateway.jt808 {\n"
" listeners.tcp.default {\n"
" bind = "
?PORT_STR
"\n"
" }\n"
" proto {\n"
" allow_anonymous = false\n"
" registry = "
"\""
?PROTO_REG_SERVER_HOST
?PROTO_REG_REGISTRY_PATH
"\"\n"
" authentication = "
"\""
?PROTO_REG_SERVER_HOST
?PROTO_REG_AUTH_PATH
"\"\n"
" }\n"
"}\n"
>>).
%% erlfmt-ignore
-define(CONF_DEFAULT, <<"
gateway.jt808 {
listeners.tcp.default {
bind = ", ?PORT_STR, "
}
proto {
auth {
allow_anonymous = false
registry = \"", ?PROTO_REG_SERVER_HOST, ?PROTO_REG_REGISTRY_PATH, "\"
authentication = \"", ?PROTO_REG_SERVER_HOST, ?PROTO_REG_AUTH_PATH, "\"
}
}
}
">>).
-define(CONF_ANONYMOUS, <<
"\n"
"gateway.jt808 {\n"
" listeners.tcp.default {\n"
" bind = "
?PORT_STR
"\n"
" }\n"
" proto {\n"
" allow_anonymous = true\n"
" }\n"
"}\n"
>>).
%% erlfmt-ignore
-define(CONF_ANONYMOUS, <<"
gateway.jt808 {
listeners.tcp.default {
bind = ", ?PORT_STR, "
}
proto {
auth {
allow_anonymous = true
}
}
}
">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -88,6 +80,12 @@ end_per_suite(_Config) ->
init_per_testcase(Case = t_case02_anonymous_register_and_auth, Config) ->
Apps = boot_apps(Case, ?CONF_ANONYMOUS, Config),
[{suite_apps, Apps} | Config];
init_per_testcase(Case, Config) when
Case =:= t_create_ALLOW_invalid_auth_config;
Case =:= t_create_DISALLOW_invalid_auth_config
->
Apps = boot_apps(Case, <<>>, Config),
[{suite_apps, Apps} | Config];
init_per_testcase(Case, Config) ->
Apps = boot_apps(Case, ?CONF_DEFAULT, Config),
[{suite_apps, Apps} | Config].
@ -324,6 +322,14 @@ location_report_28bytes() ->
binary_to_hex_string(Data) ->
lists:flatten([io_lib:format("~2.16.0B ", [X]) || <<X:8>> <= Data]).
receive_msg() ->
receive
{deliver, Topic, #message{payload = Payload}} ->
{Topic, Payload}
after 100 ->
{error, timeout}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%% test cases %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
t_case00_register(_) ->
@ -2678,10 +2684,73 @@ t_case34_dl_0x8805_single_mm_data_ctrl(_Config) ->
ok = gen_tcp:close(Socket).
receive_msg() ->
receive
{deliver, Topic, #message{payload = Payload}} ->
{Topic, Payload}
after 100 ->
{error, timeout}
end.
t_create_ALLOW_invalid_auth_config(_Config) ->
test_invalid_config(create, true).
t_create_DISALLOW_invalid_auth_config(_Config) ->
test_invalid_config(create, false).
t_update_ALLOW_invalid_auth_config(_Config) ->
test_invalid_config(update, true).
t_update_DISALLOW_invalid_auth_config(_Config) ->
test_invalid_config(update, false).
test_invalid_config(CreateOrUpdate, AnonymousAllowed) ->
InvalidConfig = raw_jt808_config(AnonymousAllowed),
UpdateResult = create_or_update(CreateOrUpdate, InvalidConfig),
?assertMatch(
{error, #{
kind := validation_error,
reason := matched_no_union_member,
path := "gateway.jt808.proto.auth"
}},
UpdateResult
).
create_or_update(create, InvalidConfig) ->
emqx_gateway_conf:load_gateway(jt808, InvalidConfig);
create_or_update(update, InvalidConfig) ->
emqx_gateway_conf:update_gateway(jt808, InvalidConfig).
%% Allow: allow anonymous connection, registry and authentication URL not required.
raw_jt808_config(Allow = true) ->
AuthConfig = #{
<<"auth">> => #{
<<"allow_anonymous">> => Allow,
%% registry and authentication `NOT REQUIRED`, but can be configured
<<"registry">> => <<?PROTO_REG_SERVER_HOST, ?PROTO_REG_REGISTRY_PATH>>,
<<"authentication">> => <<?PROTO_REG_SERVER_HOST, ?PROTO_REG_AUTH_PATH>>,
<<"BADKEY_registry_url">> => <<?PROTO_REG_SERVER_HOST, ?PROTO_REG_REGISTRY_PATH>>
}
},
emqx_utils_maps:deep_merge(raw_jt808_config(), #{<<"proto">> => AuthConfig});
%% DisAllow: required registry and authentication URL configuration to auth client.
raw_jt808_config(DisAllow = false) ->
AuthConfig = #{
<<"auth">> => #{
<<"allow_anonymous">> => DisAllow
%% registry and authentication are required but missed here
%%
%% <<"registry">> => <<?PROTO_REG_SERVER_HOST, ?PROTO_REG_REGISTRY_PATH>>,
%% <<"authentication">> => <<?PROTO_REG_SERVER_HOST, ?PROTO_REG_AUTH_PATH>>
}
},
emqx_utils_maps:deep_merge(raw_jt808_config(), #{<<"proto">> => AuthConfig}).
raw_jt808_config() ->
#{
<<"enable">> => true,
<<"enable_stats">> => true,
<<"frame">> => #{<<"max_length">> => 8192},
<<"idle_timeout">> => <<"30s">>,
<<"max_retry_times">> => 3,
<<"message_queue_len">> => 10,
<<"mountpoint">> => <<"jt808/${clientid}/">>,
<<"proto">> =>
#{
<<"dn_topic">> => <<"jt808/${clientid}/${phone}/dn">>,
<<"up_topic">> => <<"jt808/${clientid}/${phone}/up">>
},
<<"retry_interval">> => <<"8s">>
}.

View File

@ -443,6 +443,12 @@ handle_in(
handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) ->
% ignore
shutdown(normal, Channel);
%% Ack DISCONNECT even if it is not connected
handle_in(
?SN_DISCONNECT_MSG(_Duration),
Channel = #channel{conn_state = idle}
) ->
handle_out(disconnect, normal, Channel);
handle_in(
Publish =
?SN_PUBLISH_MSG(

View File

@ -176,6 +176,18 @@ t_connect(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_disconnect_msg(Socket, undefined),
%% assert: mqttsn gateway will ack disconnect msg with DISCONNECT packet
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_first_disconnect(_) ->
SockName = {'mqttsn:udp:default', 1884},
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
{ok, Socket} = gen_udp:open(0, [binary]),
send_disconnect_msg(Socket, undefined),
%% assert: mqttsn gateway will ack disconnect msg with DISCONNECT packet
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
@ -1217,7 +1229,7 @@ t_will_case01(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
send_disconnect_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
@ -1244,7 +1256,7 @@ t_will_test2(_) ->
receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
@ -1265,7 +1277,7 @@ t_will_test3(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
send_disconnect_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
@ -1294,7 +1306,7 @@ t_will_test4(_) ->
receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).

View File

@ -127,6 +127,8 @@
}
).
-define(DEFAULT_OCPP_DN_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1}).
-dialyzer(no_match).
%%--------------------------------------------------------------------
@ -547,6 +549,13 @@ handle_call(kick, _From, Channel) ->
shutdown(kicked, ok, Channel);
handle_call(discard, _From, Channel) ->
shutdown(discarded, ok, Channel);
handle_call(
subscriptions,
_From,
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}}
) ->
Subs = [{dntopic(ClientId, Mountpoint), ?DEFAULT_OCPP_DN_SUBOPTS}],
reply({ok, Subs}, Channel);
handle_call(Req, From, Channel) ->
?SLOG(error, #{msg => "unexpected_call", req => Req, from => From}),
reply(ignored, Channel).
@ -614,22 +623,6 @@ process_connect(
{error, Reason}
end.
ensure_subscribe_dn_topics(
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo}
) ->
SubOpts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1},
Topic0 = proc_tmpl(
emqx_ocpp_conf:dntopic(),
#{
clientid => ClientId,
cid => ClientId
}
),
Topic = emqx_mountpoint:mount(Mountpoint, Topic0),
ok = emqx_broker:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
Channel.
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
@ -853,6 +846,28 @@ reset_keepalive(Interval, Channel = #channel{conninfo = ConnInfo, timers = Timer
heartbeat_checking_times_backoff() ->
max(0, emqx_ocpp_conf:heartbeat_checking_times_backoff() - 1).
%%--------------------------------------------------------------------
%% Ensure Subscriptions
ensure_subscribe_dn_topics(
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo}
) ->
SubOpts = ?DEFAULT_OCPP_DN_SUBOPTS,
Topic = dntopic(ClientId, Mountpoint),
ok = emqx_broker:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
Channel.
dntopic(ClientId, Mountpoint) ->
Topic0 = proc_tmpl(
emqx_ocpp_conf:dntopic(),
#{
clientid => ClientId,
cid => ClientId
}
),
emqx_mountpoint:mount(Mountpoint, Topic0).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------

View File

@ -89,7 +89,7 @@ feedvar(Path) ->
binary_to_list(
emqx_placeholder:proc_tmpl(
emqx_placeholder:preproc_tmpl(Path),
#{application_priv => code:priv_dir(emqx_ocpp)}
#{application_priv => code:priv_dir(emqx_gateway_ocpp)}
)
).

View File

@ -16,36 +16,132 @@
-module(emqx_ocpp_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
-compile(export_all).
-compile(nowarn_export_all).
init_per_suite(Conf) ->
emqx_ct_helpers:start_apps([emqx_gateway_ocpp], fun set_special_cfg/1),
Conf.
-import(
emqx_gateway_test_utils,
[
assert_fields_exist/2,
request/2,
request/3
]
).
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_gateway_ocpp]).
-define(HEARTBEAT, <<$\n>>).
set_special_cfg(emqx) ->
application:set_env(emqx, allow_anonymous, true),
application:set_env(emqx, enable_acl_cache, false),
LoadedPluginPath = filename:join(["test", "emqx_SUITE_data", "loaded_plugins"]),
application:set_env(
emqx,
plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, LoadedPluginPath)
);
set_special_cfg(_App) ->
ok.
-define(CONF_DEFAULT, <<
"\n"
"gateway.ocpp {\n"
" mountpoint = \"ocpp/\"\n"
" default_heartbeat_interval = \"60s\"\n"
" heartbeat_checking_times_backoff = 1\n"
" message_format_checking = disable\n"
" upstream {\n"
" topic = \"cp/${clientid}\"\n"
" reply_topic = \"cp/${clientid}/Reply\"\n"
" error_topic = \"cp/${clientid}/Reply\"\n"
" }\n"
" dnstream {\n"
" topic = \"cs/${clientid}\"\n"
" }\n"
" listeners.ws.default {\n"
" bind = \"0.0.0.0:33033\"\n"
" websocket.path = \"/ocpp\"\n"
" }\n"
"}\n"
>>).
all() -> emqx_common_test_helpers:all(?MODULE).
%%--------------------------------------------------------------------
%% Testcases
%%---------------------------------------------------------------------
%% setups
%%--------------------------------------------------------------------
init_per_suite(Config) ->
application:load(emqx_gateway_ocpp),
Apps = emqx_cth_suite:start(
[
{emqx_conf, ?CONF_DEFAULT},
emqx_gateway,
emqx_auth,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
emqx_common_test_http:delete_default_app(),
emqx_cth_suite:stop(?config(suite_apps, Config)),
ok.
default_config() ->
?CONF_DEFAULT.
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_update_listeners(_Config) ->
{200, [DefaultListener]} = request(get, "/gateways/ocpp/listeners"),
ListenerConfKeys =
[
id,
type,
name,
enable,
enable_authn,
bind,
acceptors,
max_connections,
max_conn_rate,
proxy_protocol,
proxy_protocol_timeout,
websocket,
tcp_options
],
StatusKeys = [status, node_status],
assert_fields_exist(ListenerConfKeys ++ StatusKeys, DefaultListener),
?assertMatch(
#{
id := <<"ocpp:ws:default">>,
type := <<"ws">>,
name := <<"default">>,
enable := true,
enable_authn := true,
bind := <<"0.0.0.0:33033">>,
websocket := #{path := <<"/ocpp">>}
},
DefaultListener
),
UpdateBody = emqx_utils_maps:deep_put(
[websocket, path],
maps:with(ListenerConfKeys, DefaultListener),
<<"/ocpp2">>
),
{200, _} = request(put, "/gateways/ocpp/listeners/ocpp:ws:default", UpdateBody),
{200, [UpdatedListener]} = request(get, "/gateways/ocpp/listeners"),
?assertMatch(#{websocket := #{path := <<"/ocpp2">>}}, UpdatedListener).
t_enable_disable_gw_ocpp(_Config) ->
AssertEnabled = fun(Enabled) ->
{200, R} = request(get, "/gateways/ocpp"),
E = maps:get(enable, R),
?assertEqual(E, Enabled),
timer:sleep(500),
?assertEqual(E, emqx:get_config([gateway, ocpp, enable]))
end,
?assertEqual({204, #{}}, request(put, "/gateways/ocpp/enable/false", <<>>)),
AssertEnabled(false),
?assertEqual({204, #{}}, request(put, "/gateways/ocpp/enable/true", <<>>)),
AssertEnabled(true).

View File

@ -436,11 +436,11 @@ parse_batch_sql(Key, Query, Acc) ->
end;
select ->
Acc;
Otherwise ->
Type ->
?SLOG(error, #{
msg => "invalid sql statement type",
sql => Query,
type => Otherwise
type => Type
}),
Acc
end.

View File

@ -17,14 +17,17 @@
-behaviour(emqx_config_handler).
-include_lib("emqx/include/logger.hrl").
-define(OPTL, [opentelemetry]).
-define(CERTS_PATH, filename:join(["opentelemetry", "exporter"])).
-define(OTEL_EXPORTER, opentelemetry_exporter).
-define(OTEL_LOG_HANDLER, otel_log_handler).
-define(OTEL_LOG_HANDLER_ID, opentelemetry_handler).
-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
-export([pre_config_update/3, post_config_update/5]).
-export([update/1]).
-export([add_otel_log_handler/0, remove_otel_log_handler/0]).
-export([otel_exporter/1]).
@ -51,6 +54,11 @@ remove_handler() ->
ok = emqx_config_handler:remove_handler(?OPTL),
ok.
pre_config_update(?OPTL, RawConf, RawConf) ->
{ok, RawConf};
pre_config_update(?OPTL, NewRawConf, _RawConf) ->
{ok, convert_certs(NewRawConf)}.
post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) ->
ok;
post_config_update(?OPTL, _Req, New, Old, AppEnvs) ->
@ -85,6 +93,31 @@ otel_exporter(ExporterConf) ->
%% Internal functions
convert_certs(#{<<"exporter">> := ExporterConf} = NewRawConf) ->
NewRawConf#{<<"exporter">> => convert_exporter_certs(ExporterConf)};
convert_certs(#{exporter := ExporterConf} = NewRawConf) ->
NewRawConf#{exporter => convert_exporter_certs(ExporterConf)};
convert_certs(NewRawConf) ->
NewRawConf.
convert_exporter_certs(#{<<"ssl_options">> := SSLOpts} = ExporterConf) ->
ExporterConf#{<<"ssl_options">> => do_convert_certs(SSLOpts)};
convert_exporter_certs(#{ssl_options := SSLOpts} = ExporterConf) ->
ExporterConf#{ssl_options => do_convert_certs(SSLOpts)};
convert_exporter_certs(ExporterConf) ->
ExporterConf.
do_convert_certs(SSLOpts) ->
case emqx_tls_lib:ensure_ssl_files(?CERTS_PATH, SSLOpts) of
{ok, undefined} ->
SSLOpts;
{ok, SSLOpts1} ->
SSLOpts1;
{error, Reason} ->
?SLOG(error, Reason#{msg => "bad_ssl_config", name => "opentelemetry_exporter"}),
throw({bad_ssl_config, Reason})
end.
ensure_otel_metrics(
#{metrics := MetricsConf, exporter := Exporter},
#{metrics := MetricsConf, exporter := Exporter}

View File

@ -28,8 +28,7 @@
-export([
trace_process_publish/3,
start_trace_send/2,
end_trace_send/1,
event/2
end_trace_send/1
]).
-include_lib("emqx/include/emqx.hrl").
@ -37,7 +36,6 @@
-include_lib("opentelemetry_api/include/otel_tracer.hrl").
-define(EMQX_OTEL_CTX, otel_ctx).
-define(IS_ENABLED, emqx_enable).
-define(USER_PROPERTY, 'User-Property').
-define(TRACE_ALL_KEY, {?MODULE, trace_all}).
@ -103,12 +101,11 @@ trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
false ->
ProcessFun(Packet);
RootCtx ->
RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true),
Attrs = maps:merge(packet_attributes(Packet), channel_attributes(ChannelInfo)),
SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{
SpanCtx = otel_tracer:start_span(RootCtx, ?current_tracer, process_message, #{
attributes => Attrs
}),
Ctx = otel_tracer:set_current_span(RootCtx1, SpanCtx),
Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
%% put ctx to packet, so it can be further propagated
Packet1 = put_ctx_to_packet(Ctx, Packet),
_ = otel_ctx:attach(Ctx),
@ -159,17 +156,6 @@ end_trace_send(Packets) ->
packets_list(Packets)
).
%% NOTE: adds an event only within an active span (Otel Ctx must be set in the calling process dict)
-spec event(opentelemetry:event_name(), opentelemetry:attributes_map()) -> ok.
event(Name, Attributes) ->
case otel_ctx:get_value(?IS_ENABLED, false) of
true ->
?add_event(Name, Attributes),
ok;
false ->
ok
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -25,6 +25,29 @@
-define(OTEL_API_PATH, emqx_mgmt_api_test_util:api_path(["opentelemetry"])).
-define(CONF_PATH, [opentelemetry]).
-define(CACERT, <<
"-----BEGIN CERTIFICATE-----\n"
"MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV\n"
"BAYTAkNOMREwDwYDVQQIDAhoYW5nemhvdTEMMAoGA1UECgwDRU1RMQ8wDQYDVQQD\n"
"DAZSb290Q0EwHhcNMjAwNTA4MDgwNjUyWhcNMzAwNTA2MDgwNjUyWjA/MQswCQYD\n"
"VQQGEwJDTjERMA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UE\n"
"AwwGUm9vdENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzcgVLex1\n"
"EZ9ON64EX8v+wcSjzOZpiEOsAOuSXOEN3wb8FKUxCdsGrsJYB7a5VM/Jot25Mod2\n"
"juS3OBMg6r85k2TWjdxUoUs+HiUB/pP/ARaaW6VntpAEokpij/przWMPgJnBF3Ur\n"
"MjtbLayH9hGmpQrI5c2vmHQ2reRZnSFbY+2b8SXZ+3lZZgz9+BaQYWdQWfaUWEHZ\n"
"uDaNiViVO0OT8DRjCuiDp3yYDj3iLWbTA/gDL6Tf5XuHuEwcOQUrd+h0hyIphO8D\n"
"tsrsHZ14j4AWYLk1CPA6pq1HIUvEl2rANx2lVUNv+nt64K/Mr3RnVQd9s8bK+TXQ\n"
"KGHd2Lv/PALYuwIDAQABo1AwTjAdBgNVHQ4EFgQUGBmW+iDzxctWAWxmhgdlE8Pj\n"
"EbQwHwYDVR0jBBgwFoAUGBmW+iDzxctWAWxmhgdlE8PjEbQwDAYDVR0TBAUwAwEB\n"
"/zANBgkqhkiG9w0BAQsFAAOCAQEAGbhRUjpIred4cFAFJ7bbYD9hKu/yzWPWkMRa\n"
"ErlCKHmuYsYk+5d16JQhJaFy6MGXfLgo3KV2itl0d+OWNH0U9ULXcglTxy6+njo5\n"
"CFqdUBPwN1jxhzo9yteDMKF4+AHIxbvCAJa17qcwUKR5MKNvv09C6pvQDJLzid7y\n"
"E2dkgSuggik3oa0427KvctFf8uhOV94RvEDyqvT5+pgNYZ2Yfga9pD/jjpoHEUlo\n"
"88IGU8/wJCx3Ds2yc8+oBg/ynxG8f/HmCC1ET6EHHoe2jlo8FpU/SgGtghS1YL30\n"
"IWxNsPrUP+XsZpBJy/mvOhE5QXo6Y35zDqqj8tI7AGmAWu22jg==\n"
"-----END CERTIFICATE-----"
>>).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -250,3 +273,23 @@ t_put_valid(Config) ->
%% alias check
?assertEqual(15_321, emqx:get_config(?CONF_PATH ++ [metrics, interval]))
).
t_put_cert(Config) ->
Auth = ?config(auth, Config),
Path = ?OTEL_API_PATH,
SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT},
SSLDisabled = #{<<"enable">> => false, <<"cacertfile">> => ?CACERT},
Conf = #{<<"exporter">> => #{<<"ssl_options">> => SSL}},
Conf1 = #{<<"exporter">> => #{<<"ssl_options">> => SSLDisabled}},
{ok, Body} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, Conf),
#{<<"exporter">> := #{<<"ssl_options">> := #{<<"cacertfile">> := CaFile}}} = emqx_utils_json:decode(
Body
),
ct:pal("CA certfile: ~p", [CaFile]),
?assert(filelib:is_file(CaFile)),
{ok, Body1} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, Conf1),
#{<<"exporter">> := #{<<"ssl_options">> := #{<<"cacertfile">> := CaFile1}}} = emqx_utils_json:decode(
Body1
),
ct:pal("CA certfile1: ~p", [CaFile1]),
?assertNot(filelib:is_file(CaFile1)).

View File

@ -159,7 +159,9 @@ on_stop(InstId, State) ->
connector => InstId
}),
close_connections(State),
emqx_resource_pool:stop(InstId).
Res = emqx_resource_pool:stop(InstId),
?tp(postgres_stopped, #{instance_id => InstId}),
Res.
close_connections(#{pool_name := PoolName} = _State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
@ -301,6 +303,7 @@ on_query(
Type = pgsql_query_type(TypeOrKey),
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
handle_result(Res).
pgsql_query_type(sql) ->

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}.
{deps, [
%% NOTE: mind ecpool version when updating eredis_cluster version
{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.8.2"}}},
{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.8.3"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -62,25 +62,22 @@ roots() ->
fields(redis_single) ->
fields(redis_single_connector) ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(redis_single_connector) ->
[
{server, server()},
redis_type(single)
];
] ++ redis_fields();
fields(redis_cluster) ->
fields(redis_cluster_connector) ++
lists:keydelete(database, 1, redis_fields()) ++
emqx_connector_schema_lib:ssl_fields();
fields(redis_cluster_connector) ->
[
{servers, servers()},
redis_type(cluster)
];
] ++ lists:keydelete(database, 1, redis_fields());
fields(redis_sentinel) ->
fields(redis_sentinel_connector) ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(redis_sentinel_connector) ->
[
@ -91,7 +88,7 @@ fields(redis_sentinel_connector) ->
required => true,
desc => ?DESC("sentinel_desc")
}}
].
] ++ redis_fields().
server() ->
Meta = #{desc => ?DESC("server")},

View File

@ -453,7 +453,12 @@ channel_health_check(ResId, ChannelId) ->
-spec get_channels(resource_id()) -> {ok, [{binary(), map()}]} | {error, term()}.
get_channels(ResId) ->
emqx_resource_manager:get_channels(ResId).
case emqx_resource_manager:lookup_cached(ResId) of
{error, not_found} ->
{error, not_found};
{ok, _Group, _ResourceData = #{mod := Mod}} ->
{ok, emqx_resource:call_get_channels(ResId, Mod)}
end.
set_resource_status_connecting(ResId) ->
emqx_resource_manager:set_resource_status_connecting(ResId).

View File

@ -80,7 +80,7 @@ worker_pool_size_test_() ->
Conf = emqx_utils_maps:deep_put(
[
<<"bridges">>,
<<"http">>,
<<"webhook">>,
<<"simple">>,
<<"resource_opts">>,
<<"worker_pool_size">>
@ -88,7 +88,7 @@ worker_pool_size_test_() ->
BaseConf,
WorkerPoolSize
),
#{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
WPS
end,
@ -117,7 +117,7 @@ worker_pool_size_test_() ->
%%===========================================================================
parse_and_check_webhook_bridge(Hocon) ->
#{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
Conf.
parse(Hocon) ->

View File

@ -580,7 +580,8 @@ inc_action_metrics(RuleId, Result) ->
do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
do_inc_action_metrics(RuleId, R) ->
case is_ok_result(R) of
false ->

View File

@ -29,6 +29,7 @@
%% API
-export([
to_epoch_millisecond/1,
to_epoch_microsecond/1,
to_epoch_second/1,
human_readable_duration_string/1
]).
@ -54,6 +55,7 @@
%% so the maximum date can reach 9999-12-31 which is ample.
-define(MAXIMUM_EPOCH, 253402214400).
-define(MAXIMUM_EPOCH_MILLI, 253402214400_000).
-define(MAXIMUM_EPOCH_MICROS, 253402214400_000_000).
-define(DATE_PART, [
year,
@ -75,13 +77,16 @@
-reflect_type([
epoch_millisecond/0,
epoch_second/0
epoch_second/0,
epoch_microsecond/0
]).
-type epoch_second() :: non_neg_integer().
-type epoch_millisecond() :: non_neg_integer().
-type epoch_microsecond() :: non_neg_integer().
-typerefl_from_string({epoch_second/0, ?MODULE, to_epoch_second}).
-typerefl_from_string({epoch_millisecond/0, ?MODULE, to_epoch_millisecond}).
-typerefl_from_string({epoch_microsecond/0, ?MODULE, to_epoch_microsecond}).
%%--------------------------------------------------------------------
%% Epoch <-> RFC 3339
@ -93,6 +98,9 @@ to_epoch_second(DateTime) ->
to_epoch_millisecond(DateTime) ->
to_epoch(DateTime, millisecond).
to_epoch_microsecond(DateTime) ->
to_epoch(DateTime, microsecond).
to_epoch(DateTime, Unit) ->
try
case string:to_integer(DateTime) of
@ -131,6 +139,14 @@ validate_epoch(Epoch, second) when Epoch =< ?MAXIMUM_EPOCH ->
{ok, Epoch};
validate_epoch(Epoch, millisecond) when Epoch =< ?MAXIMUM_EPOCH_MILLI ->
{ok, Epoch};
%% http api use millisecond but we should transform to microsecond
validate_epoch(Epoch, microsecond) when
Epoch >= ?MAXIMUM_EPOCH andalso
Epoch =< ?MAXIMUM_EPOCH_MILLI
->
{ok, Epoch * 1000};
validate_epoch(Epoch, microsecond) when Epoch =< ?MAXIMUM_EPOCH_MICROS ->
{ok, Epoch};
validate_epoch(_Epoch, _Unit) ->
{error, bad_epoch}.

View File

@ -28,7 +28,7 @@
-export_type([value/0]).
-type statement_type() :: select | insert | delete.
-type statement_type() :: select | insert | delete | update.
-type value() :: null | binary() | number() | boolean() | [value()].
-dialyzer({no_improper_lists, [escape_mysql/4, escape_prepend/4]}).
@ -38,6 +38,7 @@ get_statement_type(Query) ->
KnownTypes = #{
<<"select">> => select,
<<"insert">> => insert,
<<"update">> => update,
<<"delete">> => delete
},
case re:run(Query, <<"^\\s*([a-zA-Z]+)">>, [{capture, all_but_first, binary}]) of

View File

@ -0,0 +1,4 @@
Fix the issue that the rule engine cannot connect to `upstash` Redis.
Before the fix, after establishing a TCP connection with the Redis service, the Redis driver of EMQX used [Inline Commands](https://redis.io/docs/reference/protocol-spec/#inline-commands) to send AUTH and SELECT commands. However, the `upstash` Redis service does not support Inline Commands, which causes the rule engine to fail to connect to the `upstash` Redis service.
After the fix, the Redis driver of EMQX uses RESP (REdis Serialization Protocol) to send AUTH and SELECT commands.

View File

@ -0,0 +1 @@
Ack the DISCONNECT packet to MQTT-SN client regardless of whether the connection has been successfully established.

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.4.0-alpha.1
version: 5.4.0-alpha.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.4.0-alpha.1
appVersion: 5.4.0-alpha.2

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.4.0-alpha.1
version: 5.4.0-alpha.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.4.0-alpha.1
appVersion: 5.4.0-alpha.2