Merge branch 'master' into port/slow_subs

This commit is contained in:
JianBo He 2022-02-11 14:37:10 +08:00 committed by GitHub
commit e895de2c5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 344 additions and 163 deletions

3
.gitignore vendored
View File

@ -30,7 +30,6 @@ compile_commands.json
cuttlefish
xrefr
*.coverdata
etc/emqx.conf.rendered
Mnesia.*/
*.DS_Store
_checkouts
@ -63,3 +62,5 @@ erlang_ls.config
# elixir
mix.lock
apps/emqx/test/emqx_static_checks_data/
# rendered configurations
*.conf.rendered

View File

@ -119,7 +119,33 @@
quota_timer => expire_quota_limit
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
-define(CHANNEL_METRICS,
[ recv_pkt
, recv_msg
, 'recv_msg.qos0'
, 'recv_msg.qos1'
, 'recv_msg.qos2'
, 'recv_msg.dropped'
, 'recv_msg.dropped.await_pubrel_timeout'
, send_pkt
, send_msg
, 'send_msg.qos0'
, 'send_msg.qos1'
, 'send_msg.qos2'
, 'send_msg.dropped'
, 'send_msg.dropped.expired'
, 'send_msg.dropped.queue_full'
, 'send_msg.dropped.too_large'
]).
-define(INFO_KEYS,
[ conninfo
, conn_state
, clientinfo
, session
, will_msg
]).
-define(LIMITER_ROUTING, message_routing).
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@ -184,10 +210,9 @@ set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = Client
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
Channel#channel{session = Session1}.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})->
emqx_session:stats(Session).
lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)).
-spec(caps(channel()) -> emqx_types:caps()).
caps(#channel{clientinfo = #{zone := Zone}}) ->
@ -1437,7 +1462,7 @@ process_alias(Packet = #mqtt_packet{
{ok, Topic} ->
NPublish = Publish#mqtt_packet_publish{topic_name = Topic},
{ok, Packet#mqtt_packet{variable = NPublish}, Channel};
false -> {error, ?RC_PROTOCOL_ERROR}
error -> {error, ?RC_PROTOCOL_ERROR}
end;
process_alias(#mqtt_packet{
@ -1778,7 +1803,7 @@ run_hooks(Name, Args, Acc) ->
-compile({inline, [find_alias/3, save_alias/4]}).
find_alias(_, _, undefined) -> false;
find_alias(_, _, undefined) -> error;
find_alias(inbound, AliasId, _TopicAliases = #{inbound := Aliases}) ->
maps:find(AliasId, Aliases);
find_alias(outbound, Topic, _TopicAliases = #{outbound := Aliases}) ->

View File

@ -131,25 +131,6 @@
, sockstate
]).
-define(CONN_STATS,
[ recv_pkt
, recv_msg
, 'recv_msg.qos0'
, 'recv_msg.qos1'
, 'recv_msg.qos2'
, 'recv_msg.dropped'
, 'recv_msg.dropped.await_pubrel_timeout'
, send_pkt
, send_msg
, 'send_msg.qos0'
, 'send_msg.qos1'
, 'send_msg.qos2'
, 'send_msg.dropped'
, 'send_msg.dropped.expired'
, 'send_msg.dropped.queue_full'
, 'send_msg.dropped.too_large'
]).
-define(SOCK_STATS,
[ recv_oct
, recv_cnt
@ -236,10 +217,9 @@ stats(#state{transport = Transport,
{ok, Ss} -> Ss;
{error, _} -> []
end,
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
lists:append([SockStats, ChanStats, ProcStats]).
%% @doc Set TCP keepalive socket options to override system defaults.
%% Idle: The number of seconds a connection needs to be idle before
@ -1030,12 +1010,12 @@ inc_outgoing_stats({error, message_too_large}) ->
inc_counter('send_msg.dropped.too_large', 1);
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
inc_counter(send_pkt, 1),
case Type =:= ?PUBLISH of
true ->
case Type of
?PUBLISH ->
inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1),
inc_qos_stats(send_msg, Packet);
false ->
_ ->
ok
end,
emqx_metrics:inc_sent(Packet).

View File

@ -112,7 +112,6 @@
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(ENABLED(X), (X =/= undefined)).
-define(LIMITER_BYTES_IN, bytes_in).
@ -163,10 +162,9 @@ stats(WsPid) when is_pid(WsPid) ->
call(WsPid, stats);
stats(#state{channel = Channel}) ->
SockStats = emqx_pd:get_counters(?SOCK_STATS),
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
lists:append([SockStats, ChanStats, ProcStats]).
%% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()).
@ -725,6 +723,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
packet => emqx_packet:format(Packet)}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
ok = inc_outgoing_stats({error, message_too_large}),
<<>>;
Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}),
ok = inc_outgoing_stats(Packet),
@ -762,19 +761,28 @@ inc_recv_stats(Cnt, Oct) ->
inc_incoming_stats(Packet = ?PACKET(Type)) ->
_ = emqx_pd:inc_counter(recv_pkt, 1),
if Type == ?PUBLISH ->
inc_counter(recv_msg, 1),
inc_counter(incoming_pubs, 1);
true -> ok
case Type of
?PUBLISH ->
inc_counter(recv_msg, 1),
inc_qos_stats(recv_msg, Packet),
inc_counter(incoming_pubs, 1);
_ ->
ok
end,
emqx_metrics:inc_recv(Packet).
inc_outgoing_stats({error, message_too_large}) ->
inc_counter('send_msg.dropped', 1),
inc_counter('send_msg.dropped.too_large', 1);
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
_ = emqx_pd:inc_counter(send_pkt, 1),
if Type == ?PUBLISH ->
inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1);
true -> ok
inc_counter(send_pkt, 1),
case Type of
?PUBLISH ->
inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1),
inc_qos_stats(send_msg, Packet);
_ ->
ok
end,
emqx_metrics:inc_sent(Packet).
@ -787,6 +795,25 @@ inc_sent_stats(Cnt, Oct) ->
inc_counter(Name, Value) ->
_ = emqx_pd:inc_counter(Name, Value),
ok.
inc_qos_stats(Type, Packet) ->
case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
undefined ->
ignore;
Key ->
inc_counter(Key, 1)
end.
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
%% for bad qos
inc_qos_stats_key(_, _) -> undefined.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------

View File

@ -883,6 +883,13 @@ t_process_alias(_) ->
{ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} =
emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
t_process_alias_inexistent_alias(_) ->
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
Channel = channel(),
?assertEqual(
{error, ?RC_PROTOCOL_ERROR},
emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel)).
t_packing_alias(_) ->
Packet1 = #mqtt_packet{variable = #mqtt_packet_publish{
topic_name = <<"x">>,
@ -919,6 +926,20 @@ t_packing_alias(_) ->
#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}},
channel())).
t_packing_alias_inexistent_alias(_) ->
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
Channel = channel(),
Packet = #mqtt_packet{variable = Publish},
ExpectedChannel = emqx_channel:set_field(
topic_aliases,
#{ inbound => #{}
, outbound => #{<<>> => 1}
},
Channel),
?assertEqual(
{Packet, ExpectedChannel},
emqx_channel:packing_alias(Packet, Channel)).
t_check_pub_authz(_) ->
emqx_config:put_zone_conf(default, [authorization, enable], true),
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),

View File

@ -49,6 +49,8 @@
, render_config_file/2
, read_schema_configs/2
, load_config/2
, is_tcp_server_available/2
, is_tcp_server_available/3
]).
-define( CERTS_PATH(CertName), filename:join( [ "etc", "certs", CertName ]) ).
@ -111,6 +113,7 @@
] }
]).
-define(DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT, 1000).
%%------------------------------------------------------------------------------
%% APIs
@ -433,3 +436,19 @@ copy_certs(_, _) -> ok.
load_config(SchemaModule, Config) ->
ok = emqx_config:delete_override_conf_files(),
ok = emqx_config:init_load(SchemaModule, Config).
-spec is_tcp_server_available(Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number()) -> boolean.
is_tcp_server_available(Host, Port) ->
is_tcp_server_available(Host, Port, ?DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT).
-spec is_tcp_server_available(Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number(), Timeout :: integer()) -> boolean.
is_tcp_server_available(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, [], Timeout) of
{ok, Socket} ->
gen_tcp:close(Socket),
true;
{error, _} ->
false
end.

View File

@ -178,8 +178,9 @@ t_stats(_) ->
end
end),
Stats = ?ws_conn:call(WsPid, stats),
[{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0},
{recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}|_] = Stats.
[?assert(lists:member(V, Stats)) || V <-
[{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0},
{recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}]].
t_call(_) ->
Info = ?ws_conn:info(st()),

View File

@ -47,7 +47,7 @@ end_per_testcase(_TestCase, _Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -43,7 +43,7 @@ init_per_testcase(_TestCase, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -54,7 +54,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
case emqx_authn_test_lib:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),

View File

@ -21,8 +21,6 @@
-compile(nowarn_export_all).
-compile(export_all).
-define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000).
authenticator_example(Id) ->
#{Id := #{value := Example}} = emqx_authn_api:authenticator_examples(),
Example.
@ -57,15 +55,6 @@ delete_config(ID) ->
{delete_authenticator, ?GLOBAL, ID},
#{rawconf_with_defaults => false}).
is_tcp_server_available(Host, Port) ->
case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of
{ok, Socket} ->
gen_tcp:close(Socket),
true;
{error, _} ->
false
end.
client_ssl_cert_opts() ->
Dir = code:lib_dir(emqx_authn, test),
#{keyfile => filename:join([Dir, "data/certs", "client.key"]),

View File

@ -34,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
case emqx_authz_test_lib:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],

View File

@ -34,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
case emqx_authn_test_lib:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],

View File

@ -34,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
case emqx_authn_test_lib:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],

View File

@ -35,7 +35,7 @@ groups() ->
[].
init_per_suite(Config) ->
case emqx_authn_test_lib:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],

View File

@ -45,15 +45,6 @@ setup_config(BaseConfig, SpecialParams) ->
{error, Reason} -> {error, Reason}
end.
is_tcp_server_available(Host, Port) ->
case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of
{ok, Socket} ->
gen_tcp:close(Socket),
true;
{error, _} ->
false
end.
test_samples(ClientInfo, Samples) ->
lists:foreach(
fun({Expected, Action, Topic}) ->

View File

@ -18,7 +18,10 @@
-behaviour(minirest_api).
-export([api_spec/0]).
-export([ api_spec/0
, paths/0
, schema/1
]).
-export([auto_subscribe/2]).
@ -29,54 +32,30 @@
-include_lib("emqx/include/emqx_placeholder.hrl").
api_spec() ->
{[auto_subscribe_api()], []}.
emqx_dashboard_swagger:spec(?MODULE).
schema() ->
case emqx_mgmt_api_configs:gen_schema(emqx:get_config([auto_subscribe, topics])) of
#{example := <<>>, type := string} ->
emqx_mgmt_util:schema(auto_subscribe_conf_example());
Example ->
emqx_mgmt_util:schema(Example)
end.
paths() ->
["/mqtt/auto_subscribe"].
auto_subscribe_conf_example() ->
schema("/mqtt/auto_subscribe") ->
#{
type => array,
items => #{
type => object,
properties =>#{
topic => #{
type => string,
example => <<
"/clientid/", ?PH_S_CLIENTID,
"/username/", ?PH_S_USERNAME,
"/host/", ?PH_S_HOST,
"/port/", ?PH_S_PORT>>},
qos => #{example => 0, type => number, enum => [0, 1, 2]},
rh => #{example => 0, type => number, enum => [0, 1, 2]},
nl => #{example => 0, type => number, enum => [0, 1]},
rap => #{example => 0, type => number, enum => [0, 1]}
}
}
}.
auto_subscribe_api() ->
Metadata = #{
'operationId' => auto_subscribe,
get => #{
description => <<"Auto subscribe list">>,
responses => #{
<<"200">> => schema()}},
200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe")
}
},
put => #{
description => <<"Update auto subscribe topic list">>,
'requestBody' => schema(),
'requestBody' => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"),
responses => #{
<<"200">> => schema(),
<<"400">> => emqx_mgmt_util:error_schema(
200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"),
400 => emqx_mgmt_util:error_schema(
<<"Request body required">>, [?BAD_REQUEST]),
<<"409">> => emqx_mgmt_util:error_schema(
409 => emqx_mgmt_util:error_schema(
<<"Auto Subscribe topics max limit">>, [?EXCEED_LIMIT])}}
},
{"/mqtt/auto_subscribe", Metadata, auto_subscribe}.
}.
%%%==============================================================================================
%% api apply

View File

@ -18,6 +18,7 @@
-behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-export([ namespace/0
, roots/0
@ -33,15 +34,19 @@ fields("auto_subscribe") ->
];
fields("topic") ->
[ {topic, sc(binary(), #{})}
, {qos, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
#{default => 0})}
, {rh, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
#{default => 0})}
, {rap, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1)]), #{default => 0})}
, {nl, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1)]), #{default => 0})}
[ {topic, sc(binary(), #{example => topic_example()})}
, {qos, sc(emqx_schema:qos(), #{default => 0})}
, {rh, sc(range(0,2), #{default => 0})}
, {rap, sc(range(0, 1), #{default => 0})}
, {nl, sc(range(0, 1), #{default => 0})}
].
topic_example() ->
<<"/clientid/", ?PH_S_CLIENTID,
"/username/", ?PH_S_USERNAME,
"/host/", ?PH_S_HOST,
"/port/", ?PH_S_PORT>>.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -28,7 +28,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
, on_jsonify/1
]).
-export([do_health_check/1]).
@ -43,9 +42,6 @@ roots() ->
%% this schema has no sub-structs
fields(_) -> [].
on_jsonify(Config) ->
Config.
%% ===================================================================
on_start(InstId, #{servers := Servers0,
port := Port,

View File

@ -31,7 +31,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
, on_jsonify/1
]).
%% ecpool callback
@ -97,9 +96,6 @@ mongo_fields() ->
] ++
emqx_connector_schema_lib:ssl_fields().
on_jsonify(Config) ->
Config.
%% ===================================================================
on_start(InstId, Config = #{mongo_type := Type,

View File

@ -25,7 +25,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
, on_jsonify/1
]).
-export([connect/1]).
@ -43,11 +42,6 @@ fields(config) ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
%%=====================================================================
on_jsonify(#{server := Server}= Config) ->
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
%% ===================================================================
on_start(InstId, #{server := {Host, Port},
database := DB,

View File

@ -28,7 +28,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
, on_jsonify/1
]).
-export([connect/1]).
@ -49,9 +48,6 @@ fields(config) ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
on_jsonify(#{server := Server}= Config) ->
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
named_queries(type) -> map();
named_queries(nullable) -> true;
named_queries(_) -> undefined.

View File

@ -43,7 +43,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
, on_jsonify/1
]).
-export([do_health_check/1]).
@ -85,9 +84,6 @@ fields(sentinel) ->
redis_fields() ++
emqx_connector_schema_lib:ssl_fields().
on_jsonify(Config) ->
Config.
%% ===================================================================
on_start(InstId, #{redis_type := Type,
database := Database,

View File

@ -236,7 +236,7 @@ Template with variables is allowed."""
].
qos() ->
hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
hoconsc:union([emqx_schema:qos(), binary()]).
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -21,6 +21,7 @@
-include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_dashboard/include/emqx_dashboard.hrl").
%% output functions
-export([ inspect/3

View File

@ -0,0 +1,143 @@
% %%--------------------------------------------------------------------
% %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
% %%
% %% Licensed under the Apache License, Version 2.0 (the "License");
% %% you may not use this file except in compliance with the License.
% %% You may obtain a copy of the License at
% %% http://www.apache.org/licenses/LICENSE-2.0
% %%
% %% Unless required by applicable law or agreed to in writing, software
% %% distributed under the License is distributed on an "AS IS" BASIS,
% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% %% See the License for the specific language governing permissions and
% %% limitations under the License.
% %%--------------------------------------------------------------------
-module(emqx_connector_pgsql_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(PGSQL_HOST, "pgsql").
-define(PGSQL_PORT, 5432).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
true ->
Config;
false ->
{skip, no_pgsql}
end.
end_per_suite(_Config) ->
ok.
init_per_testcase(_, Config) ->
?assertEqual(
{ok, #{poolname => emqx_connector_pgsql}},
emqx_connector_pgsql:on_start(<<"emqx_connector_pgsql">>, pgsql_config())
),
Config.
end_per_testcase(_, _Config) ->
?assertEqual(
ok,
emqx_connector_pgsql:on_stop(<<"emqx_connector_pgsql">>, #{poolname => emqx_connector_pgsql})
).
% %%------------------------------------------------------------------------------
% %% Testcases
% %%------------------------------------------------------------------------------
% Simple test to make sure the proper reference to the module is returned.
t_roots(_Config) ->
ExpectedRoots = [{config, #{type => {ref, emqx_connector_pgsql, config}}}],
ActualRoots = emqx_connector_pgsql:roots(),
?assertEqual(ExpectedRoots, ActualRoots).
% Not sure if this level of testing is appropriate for this function.
% Checking the actual values/types of the returned term starts getting
% into checking the emqx_connector_schema_lib.erl returns and the shape
% of expected data elsewhere.
t_fields(_Config) ->
Fields = emqx_connector_pgsql:fields(config),
lists:foreach(
fun({FieldName, FieldValue}) ->
?assert(is_atom(FieldName)),
if
is_map(FieldValue) ->
?assert(maps:is_key(type, FieldValue) and maps:is_key(default, FieldValue));
true ->
?assert(is_function(FieldValue))
end
end,
Fields
).
% Execute a minimal query to validate connection.
t_basic_query(_Config) ->
?assertMatch(
{ok, _, [{1}]},
emqx_connector_pgsql:on_query(
<<"emqx_connector_pgsql">>, {query, test_query()}, undefined, #{
poolname => emqx_connector_pgsql
}
)
).
% Perform health check.
t_do_healthcheck(_Config) ->
?assertEqual(
{ok, #{poolname => emqx_connector_pgsql}},
emqx_connector_pgsql:on_health_check(<<"emqx_connector_pgsql">>, #{
poolname => emqx_connector_pgsql
})
).
% Perform healthcheck on a connector that does not exist.
t_healthceck_when_connector_does_not_exist(_Config) ->
?assertEqual(
{error, health_check_failed, #{poolname => emqx_connector_pgsql_does_not_exist}},
emqx_connector_pgsql:on_health_check(<<"emqx_connector_pgsql_does_not_exist">>, #{
poolname => emqx_connector_pgsql_does_not_exist
})
).
% %%------------------------------------------------------------------------------
% %% Helpers
% %%------------------------------------------------------------------------------
pgsql_config() ->
#{
auto_reconnect => true,
database => <<"mqtt">>,
username => <<"root">>,
password => <<"public">>,
pool_size => 8,
server => {?PGSQL_HOST, ?PGSQL_PORT},
ssl => #{enable => false}
}.
pgsql_bad_config() ->
#{
auto_reconnect => true,
database => <<"bad_mqtt">>,
username => <<"bad_root">>,
password => <<"bad_public">>,
pool_size => 8,
server => {?PGSQL_HOST, ?PGSQL_PORT},
ssl => #{enable => false}
}.
test_query() ->
<<"SELECT 1">>.

View File

@ -73,6 +73,9 @@ fields(ssl_conf) ->
, {keyfile,
sc(binary(),
#{example => <<"{{ platform_etc_dir }}/certs/key.pem">>})}
, {verify,
sc(hoconsc:enum([verify_peer, verify_none]),
#{example => <<"verify_none">>})}
].
%% types

View File

@ -189,7 +189,7 @@ The type of delivered coap message can be set to:<br>
3. qos: Mapping from QoS type of received message, QoS0 -> non, QoS1,2 -> con"
})}
, {subscribe_qos,
sc(hoconsc:union([qos0, qos1, qos2, coap]),
sc(hoconsc:enum([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for subscribe request.<br>
@ -202,7 +202,7 @@ The indicator can be set to:
* qos1: If the subscribe request is confirmable"
})}
, {publish_qos,
sc(hoconsc:union([qos0, qos1, qos2, coap]),
sc(hoconsc:enum([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for publish request.<br>
@ -356,7 +356,7 @@ notifyevents via this topic, if the client reports any resource changes"
fields(translator) ->
[ {topic, sc(binary(), #{nullable => false})}
, {qos, sc(range(0, 2), #{default => 0})}
, {qos, sc(emqx_schema:qos(), #{default => 0})}
];
fields(udp_listeners) ->

View File

@ -136,6 +136,8 @@ parse(<<>>, Parser) ->
parse(Bytes, #{phase := body, length := Len, state := State}) ->
parse(body, Bytes, State, Len);
parse(<<?LF, Bytes/binary>>, #{phase := hdname, state := State}) ->
parse(body, Bytes, State, content_len(State));
parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
parse(Phase, Bytes, State);

View File

@ -385,6 +385,34 @@ t_1000_msg_send(_) ->
lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
end).
t_sticky_packets_truncate_after_headers(_) ->
with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _,
body = _}, _, _} = parse(Data),
Topic = <<"/queue/foo">>,
emqx:subscribe(Topic),
gen_tcp:send(Sock, ["SEND\n",
"content-length:3\n",
"destination:/queue/foo\n"]),
timer:sleep(300),
gen_tcp:send(Sock, ["\nfoo",0]),
receive
{deliver, Topic, _Msg}->
ok
after 100 ->
?assert(false, "waiting message timeout")
end
end).
t_rest_clienit_info(_) ->
with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>,

View File

@ -369,13 +369,13 @@ fields(keepalive) ->
fields(subscribe) ->
[
{topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
{topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})},
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}
];
fields(unsubscribe) ->
[
{topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})}
{topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})}
];
fields(meta) ->

View File

@ -70,7 +70,6 @@
-export([ call_start/3 %% start the instance
, call_health_check/3 %% verify if the resource is working normally
, call_stop/3 %% stop the instance
, call_jsonify/2
]).
-export([ list_instances/0 %% list all the instances, id only.
@ -86,11 +85,8 @@
-optional_callbacks([ on_query/4
, on_health_check/2
, on_jsonify/1
]).
-callback on_jsonify(resource_config()) -> jsx:json_term().
%% when calling emqx_resource:start/1
-callback on_start(instance_id(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
@ -284,13 +280,6 @@ call_health_check(InstId, Mod, ResourceState) ->
call_stop(InstId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
-spec call_jsonify(module(), resource_config()) -> jsx:json_term().
call_jsonify(Mod, Config) ->
case erlang:function_exported(Mod, on_jsonify, 1) of
false -> Config;
true -> ?SAFE_CALL(Mod:on_jsonify(Config))
end.
-spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}.
check_config(ResourceType, Conf) ->

View File

@ -207,8 +207,7 @@ fields("ctx_disconnected") ->
].
qos() ->
{"qos", sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
#{desc => "The Message QoS"})}.
{"qos", sc(emqx_schema:qos(), #{desc => "The Message QoS"})}.
rule_id() ->
{"id", sc(binary(),

View File

@ -194,7 +194,7 @@ outputs() ->
].
qos() ->
hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
hoconsc:union([emqx_schema:qos(), binary()]).
validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of