Merge remote-tracking branch 'origin/release-54' into sync-r54-m-20231207

This commit is contained in:
Thales Macedo Garitezi 2023-12-07 13:30:04 -03:00
commit f37c86afb9
161 changed files with 2415 additions and 662 deletions

View File

@ -0,0 +1,69 @@
version: '3.9'
services:
jaeger-all-in-one:
image: jaegertracing/all-in-one:1.51.0
container_name: jaeger.emqx.net
hostname: jaeger.emqx.net
networks:
- emqx_bridge
restart: always
# ports:
# - "16686:16686"
user: "${DOCKER_USER:-root}"
# Collector
otel-collector:
image: otel/opentelemetry-collector:0.90.0
container_name: otel-collector.emqx.net
hostname: otel-collector.emqx.net
networks:
- emqx_bridge
restart: always
command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"]
volumes:
- ./otel:/etc/
# ports:
# - "1888:1888" # pprof extension
# - "8888:8888" # Prometheus metrics exposed by the collector
# - "8889:8889" # Prometheus exporter metrics
# - "13133:13133" # health_check extension
# - "4317:4317" # OTLP gRPC receiver
# - "4318:4318" # OTLP http receiver
# - "55679:55679" # zpages extension
depends_on:
- jaeger-all-in-one
user: "${DOCKER_USER:-root}"
# Collector
otel-collector-tls:
image: otel/opentelemetry-collector:0.90.0
container_name: otel-collector-tls.emqx.net
hostname: otel-collector-tls.emqx.net
networks:
- emqx_bridge
restart: always
command: ["--config=/etc/otel-collector-config-tls.yaml", "${OTELCOL_ARGS}"]
volumes:
- ./otel:/etc/
- ./certs:/etc/certs
# ports:
# - "14317:4317" # OTLP gRPC receiver
depends_on:
- jaeger-all-in-one
user: "${DOCKER_USER:-root}"
#networks:
# emqx_bridge:
# driver: bridge
# name: emqx_bridge
# enable_ipv6: true
# ipam:
# driver: default
# config:
# - subnet: 172.100.239.0/24
# gateway: 172.100.239.1
# - subnet: 2001:3200:3200::/64
# gateway: 2001:3200:3200::1
#

View File

@ -0,0 +1,6 @@
certs
hostname
hosts
otel-collector.json
otel-collector-tls.json
resolv.conf

View File

@ -0,0 +1,52 @@
receivers:
otlp:
protocols:
grpc:
tls:
ca_file: /etc/certs/ca.crt
cert_file: /etc/certs/server.crt
key_file: /etc/certs/server.key
http:
tls:
ca_file: /etc/certs/ca.crt
cert_file: /etc/certs/server.crt
key_file: /etc/certs/server.key
exporters:
logging:
verbosity: detailed
otlp:
endpoint: jaeger.emqx.net:4317
tls:
insecure: true
debug:
verbosity: detailed
file:
path: /etc/otel-collector-tls.json
processors:
batch:
# send data immediately
timeout: 0
extensions:
health_check:
zpages:
endpoint: :55679
service:
extensions: [zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging]
logs:
receivers: [otlp]
processors: [batch]
exporters: [logging, file]

View File

@ -0,0 +1,51 @@
receivers:
otlp:
protocols:
grpc:
tls:
# ca_file: /etc/ca.pem
# cert_file: /etc/server.pem
# key_file: /etc/server.key
http:
tls:
# ca_file: /etc/ca.pem
# cert_file: /etc/server.pem
# key_file: /etc/server.key
exporters:
logging:
verbosity: detailed
otlp:
endpoint: jaeger.emqx.net:4317
tls:
insecure: true
debug:
verbosity: detailed
file:
path: /etc/otel-collector.json
processors:
batch:
# send data immediately
timeout: 0
extensions:
health_check:
zpages:
endpoint: :55679
service:
extensions: [zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging]
logs:
receivers: [otlp]
processors: [batch]
exporters: [logging, file]

4
.gitignore vendored
View File

@ -23,6 +23,8 @@ _build
.rebar3
rebar3.crashdump
.DS_Store
# emqx_ds_replication_layer_meta:ensure_site/0
data
etc/gen.emqx.conf
compile_commands.json
cuttlefish
@ -69,3 +71,5 @@ bom.json
ct_run*/
apps/emqx_conf/etc/emqx.conf.all.rendered*
rebar-git-cache.tar
# build docker image locally
.docker_image_tag

View File

@ -21,7 +21,7 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.5.2
export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2
export EMQX_EE_DASHBOARD_VERSION ?= e1.4.0-beta.1
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise

View File

@ -98,7 +98,7 @@
#+W w
## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp
#+C no_time_warp
+C multi_time_warp
## Prevents loading information about source filenames and line numbers.
#+L

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.3.2").
-define(EMQX_RELEASE_CE, "5.4.0-alpha.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.3.2").
-define(EMQX_RELEASE_EE, "5.4.0-alpha.1").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -27,10 +27,10 @@
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.8"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.16"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.0"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.1"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
@ -45,7 +45,7 @@
{meck, "0.9.2"},
{proper, "1.4.0"},
{bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.0"}}}
]},
{extra_src_dirs, [{"test", [recursive]},
{"integration_test", [recursive]}]}

View File

@ -24,7 +24,7 @@ IsQuicSupp = fun() ->
end,
Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.303"}}}.
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.308"}}}.
Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

View File

@ -19,6 +19,6 @@
-callback import_config(RawConf :: map()) ->
{ok, #{
root_key => emqx_utils_maps:config_key(),
changed => [emqx_utils_maps:config_path()]
changed => [emqx_utils_maps:config_key_path()]
}}
| {error, #{root_key => emqx_utils_maps:config_key(), reason => term()}}.

View File

@ -11,6 +11,7 @@
gproc,
gen_rpc,
mria,
ekka,
esockd,
cowboy,
sasl,

View File

@ -96,7 +96,7 @@
%% Authentication Data Cache
auth_cache :: maybe(map()),
%% Quota checkers
quota :: emqx_limiter_container:limiter(),
quota :: emqx_limiter_container:container(),
%% Timers
timers :: #{atom() => disabled | maybe(reference())},
%% Conn State
@ -398,8 +398,15 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
case emqx_packet:check(Packet) of
ok -> process_publish(Packet, Channel);
{error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel)
ok ->
emqx_external_trace:trace_process_publish(
Packet,
%% More info can be added in future, but for now only clientid is used
trace_info(Channel),
fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
);
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
end;
handle_in(
?PUBACK_PACKET(PacketId, _ReasonCode, Properties),
@ -921,7 +928,11 @@ handle_deliver(
Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
{ok, Channel#channel{session = NSession}};
handle_deliver(
handle_deliver(Delivers, Channel) ->
Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)),
do_handle_deliver(Delivers1, Channel).
do_handle_deliver(
Delivers,
Channel = #channel{
session = Session,
@ -1429,6 +1440,10 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
emqx_olp:backoff(Zone),
ok.
trace_info(Channel) ->
%% More info can be added in future, but for now only clientid is used
maps:from_list(info([clientid], Channel)).
%%--------------------------------------------------------------------
%% Enrich MQTT Connect Info

View File

@ -23,9 +23,7 @@
-export([
trans/2,
trans/3,
lock/1,
lock/2,
unlock/1
]).
@ -33,19 +31,14 @@
start_link() ->
ekka_locker:start_link(?MODULE).
-spec trans(emqx_types:clientid(), fun(([node()]) -> any())) -> any().
trans(ClientId, Fun) ->
trans(ClientId, Fun, undefined).
-spec trans(
maybe(emqx_types:clientid()),
fun(([node()]) -> any()),
ekka_locker:piggyback()
fun(([node()]) -> any())
) -> any().
trans(undefined, Fun, _Piggyback) ->
trans(undefined, Fun) ->
Fun([]);
trans(ClientId, Fun, Piggyback) ->
case lock(ClientId, Piggyback) of
trans(ClientId, Fun) ->
case lock(ClientId) of
{true, Nodes} ->
try
Fun(Nodes)
@ -56,14 +49,10 @@ trans(ClientId, Fun, Piggyback) ->
{error, client_id_unavailable}
end.
-spec lock(emqx_types:clientid()) -> ekka_locker:lock_result().
-spec lock(emqx_types:clientid()) -> {boolean(), [node() | {node(), any()}]}.
lock(ClientId) ->
ekka_locker:acquire(?MODULE, ClientId, strategy()).
-spec lock(emqx_types:clientid(), ekka_locker:piggyback()) -> ekka_locker:lock_result().
lock(ClientId, Piggyback) ->
ekka_locker:acquire(?MODULE, ClientId, strategy(), Piggyback).
-spec unlock(emqx_types:clientid()) -> {boolean(), [node()]}.
unlock(ClientId) ->
ekka_locker:release(?MODULE, ClientId, strategy()).

View File

@ -84,6 +84,7 @@
ok | {ok, Result :: any()} | {error, Reason :: term()}.
-type state() :: #{handlers := any()}.
-type config_key_path() :: emqx_utils_maps:config_key_path().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []).
@ -91,21 +92,21 @@ start_link() ->
stop() ->
gen_server:stop(?MODULE).
-spec update_config(module(), emqx_config:config_key_path(), emqx_config:update_args()) ->
-spec update_config(module(), config_key_path(), emqx_config:update_args()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
%% force convert the path to a list of atoms, as there maybe some wildcard names/ids in the path
AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity).
-spec add_handler(emqx_config:config_key_path(), handler_name()) ->
-spec add_handler(config_key_path(), handler_name()) ->
ok | {error, {conflict, list()}}.
add_handler(ConfKeyPath, HandlerName) ->
assert_callback_function(HandlerName),
gen_server:call(?MODULE, {add_handler, ConfKeyPath, HandlerName}).
%% @doc Remove handler asynchronously
-spec remove_handler(emqx_config:config_key_path()) -> ok.
-spec remove_handler(config_key_path()) -> ok.
remove_handler(ConfKeyPath) ->
gen_server:cast(?MODULE, {remove_handler, ConfKeyPath}).
@ -764,7 +765,7 @@ assert_callback_function(Mod) ->
end,
ok.
-spec schema(module(), emqx_utils_maps:config_key_path()) -> hocon_schema:schema().
-spec schema(module(), config_key_path()) -> hocon_schema:schema().
schema(SchemaModule, [RootKey | _]) ->
Roots = hocon_schema:roots(SchemaModule),
{Field, Translations} =

View File

@ -855,9 +855,14 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
%%--------------------------------------------------------------------
%% Handle outgoing packets
handle_outgoing(Packets, State) when is_list(Packets) ->
handle_outgoing(Packets, State) ->
Res = do_handle_outgoing(Packets, State),
emqx_external_trace:end_trace_send(Packets),
Res.
do_handle_outgoing(Packets, State) when is_list(Packets) ->
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
handle_outgoing(Packet, State) ->
do_handle_outgoing(Packet, State) ->
send((serialize_and_inc_stats_fun(State))(Packet), State).
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->

View File

@ -59,12 +59,14 @@
-define(DEFAULT_CACHE_CAPACITY, 100).
-define(CONF_KEY_PATH, [crl_cache]).
-type duration() :: non_neg_integer().
-record(state, {
refresh_timers = #{} :: #{binary() => timer:tref()},
refresh_interval = timer:minutes(15) :: timer:time(),
http_timeout = ?HTTP_TIMEOUT :: timer:time(),
refresh_timers = #{} :: #{binary() => reference()},
refresh_interval = timer:minutes(15) :: duration(),
http_timeout = ?HTTP_TIMEOUT :: duration(),
%% keeps track of URLs by insertion time
insertion_times = gb_trees:empty() :: gb_trees:tree(timer:time(), url()),
insertion_times = gb_trees:empty() :: gb_trees:tree(duration(), url()),
%% the set of cached URLs, for testing if an URL is already
%% registered.
cached_urls = sets:new([{version, 2}]) :: sets:set(url()),

View File

@ -0,0 +1,117 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_external_trace).
-callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
Packet :: emqx_types:packet(),
ChannelInfo :: channel_info(),
Res :: term().
-callback start_trace_send(list(emqx_types:deliver()), channel_info()) ->
list(emqx_types:deliver()).
-callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
-callback event(EventName :: term(), Attributes :: term()) -> ok.
-type channel_info() :: #{atom() => _}.
-export([
provider/0,
register_provider/1,
unregister_provider/1,
trace_process_publish/3,
start_trace_send/2,
end_trace_send/1,
event/1,
event/2
]).
-export_type([channel_info/0]).
-define(PROVIDER, {?MODULE, trace_provider}).
-define(with_provider(IfRegistered, IfNotRegistered),
case persistent_term:get(?PROVIDER, undefined) of
undefined ->
IfNotRegistered;
Provider ->
Provider:IfRegistered
end
).
%%--------------------------------------------------------------------
%% provider API
%%--------------------------------------------------------------------
-spec register_provider(module()) -> ok | {error, term()}.
register_provider(Module) when is_atom(Module) ->
case is_valid_provider(Module) of
true ->
persistent_term:put(?PROVIDER, Module);
false ->
{error, invalid_provider}
end.
-spec unregister_provider(module()) -> ok | {error, term()}.
unregister_provider(Module) ->
case persistent_term:get(?PROVIDER, undefined) of
Module ->
persistent_term:erase(?PROVIDER),
ok;
_ ->
{error, not_registered}
end.
-spec provider() -> module() | undefined.
provider() ->
persistent_term:get(?PROVIDER, undefined).
%%--------------------------------------------------------------------
%% trace API
%%--------------------------------------------------------------------
-spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
Packet :: emqx_types:packet(),
ChannelInfo :: channel_info(),
Res :: term().
trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
?with_provider(?FUNCTION_NAME(Packet, ChannelInfo, ProcessFun), ProcessFun(Packet)).
-spec start_trace_send(list(emqx_types:deliver()), channel_info()) ->
list(emqx_types:deliver()).
start_trace_send(Delivers, ChannelInfo) ->
?with_provider(?FUNCTION_NAME(Delivers, ChannelInfo), Delivers).
-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
end_trace_send(Packets) ->
?with_provider(?FUNCTION_NAME(Packets), ok).
event(Name) ->
event(Name, #{}).
-spec event(term(), term()) -> ok.
event(Name, Attributes) ->
?with_provider(?FUNCTION_NAME(Name, Attributes), ok).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
is_valid_provider(Module) ->
lists:all(
fun({F, A}) -> erlang:function_exported(Module, F, A) end,
?MODULE:behaviour_info(callbacks)
).

View File

@ -960,6 +960,8 @@ serialize_properties(Props) when is_map(Props) ->
serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined ->
<<>>;
serialize_property(internal_extra, _) ->
<<>>;
serialize_property('Payload-Format-Indicator', Val) ->
<<16#01, Val>>;
serialize_property('Message-Expiry-Interval', Val) ->

View File

@ -25,7 +25,7 @@
module := ?MODULE,
id := emqx_limiter_schema:limiter_id(),
type := emqx_limiter_schema:limiter_type(),
bucket := hocons:config()
bucket := hocon:config()
}.
%%--------------------------------------------------------------------
@ -35,7 +35,7 @@
-spec new_create_options(
emqx_limiter_schema:limiter_id(),
emqx_limiter_schema:limiter_type(),
hocons:config()
hocon:config()
) -> create_options().
new_create_options(Id, Type, BucketCfg) ->
#{module => ?MODULE, id => Id, type => Type, bucket => BucketCfg}.

View File

@ -32,7 +32,7 @@
make_future/1,
available/1
]).
-export_type([local_limiter/0]).
-export_type([local_limiter/0, limiter/0]).
%% a token bucket limiter which may or not contains a reference to another limiter,
%% and can be used in a client alone

View File

@ -70,7 +70,7 @@
-spec get_limiter_by_types(
limiter_id() | {atom(), atom()},
list(limiter_type()),
#{limiter_type() => hocons:config()}
#{limiter_type() => hocon:config()}
) -> container().
get_limiter_by_types({Type, Listener}, Types, BucketCfgs) ->
Id = emqx_listeners:listener_id(Type, Listener),

View File

@ -77,7 +77,7 @@
start_server(Type) ->
emqx_limiter_server_sup:start(Type).
-spec start_server(limiter_type(), hocons:config()) -> _.
-spec start_server(limiter_type(), hocon:config()) -> _.
start_server(Type, Cfg) ->
emqx_limiter_server_sup:start(Type, Cfg).

View File

@ -112,7 +112,7 @@
-spec connect(
limiter_id(),
limiter_type(),
hocons:config() | undefined
hocon:config() | undefined
) ->
{ok, emqx_htb_limiter:limiter()} | {error, _}.
%% undefined is the default situation, no limiter setting by default
@ -128,7 +128,7 @@ connect(Id, Type, Cfg) ->
maps:get(Type, Cfg, undefined)
).
-spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
-spec add_bucket(limiter_id(), limiter_type(), hocon:config() | undefined) -> ok.
add_bucket(_Id, _Type, undefined) ->
ok;
%% a bucket with an infinity rate shouldn't be added to this server, because it is always full
@ -153,7 +153,7 @@ name(Type) ->
restart(Type) ->
?CALL(Type).
-spec update_config(limiter_type(), hocons:config()) -> ok | {error, _}.
-spec update_config(limiter_type(), hocon:config()) -> ok | {error, _}.
update_config(Type, Config) ->
?CALL(Type, {update_config, Type, Config}).
@ -166,7 +166,7 @@ whereis(Type) ->
%% Starts the server
%% @end
%%--------------------------------------------------------------------
-spec start_link(limiter_type(), hocons:config()) -> _.
-spec start_link(limiter_type(), hocon:config()) -> _.
start_link(Type, Cfg) ->
gen_server:start_link({local, name(Type)}, ?MODULE, [Type, Cfg], []).
@ -500,7 +500,7 @@ init_tree(Type, #{rate := Rate} = Cfg) ->
buckets => #{}
}.
-spec make_root(hocons:confg()) -> root().
-spec make_root(hocon:config()) -> root().
make_root(#{rate := Rate, burst := Burst}) ->
#{
rate => Rate,
@ -554,7 +554,7 @@ do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
State#{buckets := maps:remove(Id, Buckets)}
end.
-spec get_initial_val(hocons:config()) -> decimal().
-spec get_initial_val(hocon:config()) -> decimal().
get_initial_val(
#{
initial := Initial,

View File

@ -47,7 +47,7 @@ start(Type) ->
Spec = make_child(Type),
supervisor:start_child(?MODULE, Spec).
-spec start(emqx_limiter_schema:limiter_type(), hocons:config()) -> _.
-spec start(emqx_limiter_schema:limiter_type(), hocon:config()) -> _.
start(Type, Cfg) ->
Spec = make_child(Type, Cfg),
supervisor:start_child(?MODULE, Spec).

View File

@ -311,7 +311,8 @@ to_packet(
qos = QoS,
headers = Headers,
topic = Topic,
payload = Payload
payload = Payload,
extra = Extra
}
) ->
#mqtt_packet{
@ -324,8 +325,8 @@ to_packet(
variable = #mqtt_packet_publish{
topic_name = Topic,
packet_id = PacketId,
properties = filter_pub_props(
maps:get(properties, Headers, #{})
properties = maybe_put_extra(
Extra, filter_pub_props(maps:get(properties, Headers, #{}))
)
},
payload = Payload
@ -345,6 +346,11 @@ filter_pub_props(Props) ->
Props
).
maybe_put_extra(Extra, Props) when map_size(Extra) > 0 ->
Props#{internal_extra => Extra};
maybe_put_extra(_Extra, Props) ->
Props.
%% @doc Message to map
-spec to_map(emqx_types:message()) -> message_map().
to_map(#message{

View File

@ -452,9 +452,15 @@ to_message(
Headers
) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
{Extra, Props1} =
case maps:take(internal_extra, Props) of
error -> {#{}, Props};
ExtraProps -> ExtraProps
end,
Msg#message{
flags = #{dup => Dup, retain => Retain},
headers = Headers#{properties => Props}
headers = Headers#{properties => Props1},
extra = Extra
}.
-spec will_msg(#mqtt_packet_connect{}) -> emqx_types:message().

View File

@ -47,13 +47,15 @@
handle_info/2
]).
-export_type([cb_state/0, cb_ret/0]).
-type cb_state() :: #{
%% connecion owner pid
conn_pid := pid(),
%% Pid of ctrl stream
ctrl_pid := undefined | pid(),
%% quic connecion handle
conn := undefined | quicer:conneciton_handle(),
conn := undefined | quicer:connection_handle(),
%% Data streams that handoff from this process
%% these streams could die/close without effecting the connecion/session.
%@TODO type?
@ -85,7 +87,7 @@ init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(ConnOpts) when is_map(ConnOpts) ->
{ok, init_cb_state(ConnOpts)}.
-spec closed(quicer:conneciton_handle(), quicer:conn_closed_props(), cb_state()) ->
-spec closed(quicer:connection_handle(), quicer:conn_closed_props(), cb_state()) ->
{stop, normal, cb_state()}.
closed(_Conn, #{is_peer_acked := _} = Prop, S) ->
?SLOG(debug, Prop),

View File

@ -124,7 +124,7 @@ send_complete(_Stream, false, S) ->
send_complete(_Stream, true = _IsCanceled, S) ->
{ok, S}.
-spec send_shutdown_complete(stream_handle(), error_code(), cb_state()) -> cb_ret().
-spec send_shutdown_complete(stream_handle(), IsGraceful :: boolean(), cb_state()) -> cb_ret().
send_shutdown_complete(_Stream, _Flags, S) ->
{ok, S}.
@ -321,7 +321,7 @@ serialize_packet(Packet, Serialize) ->
-spec init_state(
quicer:stream_handle(),
quicer:connection_handle(),
quicer:new_stream_props()
non_neg_integer()
) ->
% @TODO
map().

View File

@ -3381,7 +3381,7 @@ naive_env_interpolation("$" ++ Maybe = Original) ->
filename:join([Path, Tail]);
error ->
?SLOG(warning, #{
msg => "failed_to_resolve_env_variable",
msg => "cannot_resolve_env_variable",
env => Env,
original => Original
}),

View File

@ -114,7 +114,9 @@
reply/0,
replies/0,
common_timer_name/0,
custom_timer_name/0
custom_timer_name/0,
session_id/0,
session/0
]).
-type session_id() :: _TODO.
@ -150,6 +152,8 @@
await_rel_timeout := timeout()
}.
-type session() :: t().
-type t() ::
emqx_session_mem:session()
| emqx_persistent_session_ds:session().

View File

@ -114,6 +114,8 @@
-export_type([takeover_data/0]).
-export_type([startlink_ret/0]).
-type proto_ver() ::
?MQTT_PROTO_V3
| ?MQTT_PROTO_V4

View File

@ -34,7 +34,7 @@ t_start_link(_) ->
emqx_cm_locker:start_link().
t_trans(_) ->
ok = emqx_cm_locker:trans(undefined, fun(_) -> ok end, []),
ok = emqx_cm_locker:trans(undefined, fun(_) -> ok end),
ok = emqx_cm_locker:trans(<<"clientid">>, fun(_) -> ok end).
t_lock_unlock(_) ->

View File

@ -43,17 +43,28 @@ start_link(Name, Args, Envs, Timeout) when is_atom(Name) ->
do_start(Name0, Args, Envs, Timeout, Func) when is_atom(Name0) ->
{Name, Host} = parse_node_name(Name0),
{ok, Pid, Node} = peer:Func(#{
name => Name,
host => Host,
args => Args,
env => Envs,
wait_boot => Timeout,
longnames => true,
shutdown => {halt, 1000}
}),
true = register(Node, Pid),
{ok, Node}.
%% Create exclusive current directory for the node. Some configurations, like plugin
%% installation directory, are the same for the whole cluster, and nodes on the same
%% machine will step on each other's toes...
{ok, Cwd} = file:get_cwd(),
NodeCwd = filename:join([Cwd, Name]),
ok = filelib:ensure_dir(filename:join([NodeCwd, "dummy"])),
try
file:set_cwd(NodeCwd),
{ok, Pid, Node} = peer:Func(#{
name => Name,
host => Host,
args => Args,
env => Envs,
wait_boot => Timeout,
longnames => true,
shutdown => {halt, 1000}
}),
true = register(Node, Pid),
{ok, Node}
after
file:set_cwd(Cwd)
end.
stop(Node) when is_atom(Node) ->
Pid = whereis(Node),

View File

@ -207,7 +207,7 @@ t_to_map(_) ->
{topic, <<"topic">>},
{payload, <<"payload">>},
{timestamp, emqx_message:timestamp(Msg)},
{extra, []}
{extra, #{}}
],
?assertEqual(List, emqx_message:to_list(Msg)),
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
@ -223,7 +223,7 @@ t_from_map(_) ->
topic => <<"topic">>,
payload => <<"payload">>,
timestamp => emqx_message:timestamp(Msg),
extra => []
extra => #{}
},
?assertEqual(Map, emqx_message:to_map(Msg)),
?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).

View File

@ -95,7 +95,7 @@ to_audit(#{from := erlang_console, function := F, args := Args}) ->
http_method = <<"">>,
http_request = <<"">>,
duration_ms = 0,
args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args]))
args = iolist_to_binary(io_lib:format("~p: ~ts", [F, Args]))
}.
log(_Level, undefined, _Handler) ->
@ -141,7 +141,7 @@ handle_continue(setup, State) ->
NewState = State#{role => mria_rlog:role()},
?AUDIT(alert, #{
cmd => emqx,
args => ["start"],
args => [<<"start">>],
version => emqx_release:version(),
from => cli,
duration_ms => 0

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth, [
{description, "EMQX Authentication and authorization"},
{vsn, "0.1.28"},
{vsn, "0.1.29"},
{modules, []},
{registered, [emqx_auth_sup]},
{applications, [

View File

@ -21,7 +21,9 @@
-include("emqx_authn.hrl").
-include("emqx_authn_chains.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-behaviour(emqx_schema_hooks).
-export([

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auto_subscribe, [
{description, "Auto subscribe Application"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{mod, {emqx_auto_subscribe_app, []}},
{applications, [

View File

@ -17,7 +17,7 @@
-export([init/1]).
-spec init(hocons:config()) -> {Module :: atom(), Config :: term()}.
-spec init(hocon:config()) -> {Module :: atom(), Config :: term()}.
init(Config) ->
do_init(Config).

View File

@ -87,12 +87,15 @@ paths() ->
"/bridges_probe"
].
error_schema(Code, Message) when is_atom(Code) ->
error_schema([Code], Message);
error_schema(Codes, Message) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message));
error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
emqx_dashboard_swagger:error_codes(Codes, Message).
error_schema(Code, Message) ->
error_schema(Code, Message, _ExtraFields = []).
error_schema(Code, Message, ExtraFields) when is_atom(Code) ->
error_schema([Code], Message, ExtraFields);
error_schema(Codes, Message, ExtraFields) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message), ExtraFields);
error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(Message) ->
ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
@ -340,7 +343,8 @@ schema("/bridges/:id") ->
204 => <<"Bridge deleted">>,
400 => error_schema(
'BAD_REQUEST',
"Cannot delete bridge while active rules are defined for this bridge"
"Cannot delete bridge while active rules are defined for this bridge",
[{rules, mk(array(string()), #{desc => "Dependent Rule IDs"})}]
),
404 => error_schema('NOT_FOUND', "Bridge not found"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
@ -517,11 +521,12 @@ schema("/bridges_probe") ->
reason := rules_depending_on_this_bridge,
rule_ids := RuleIds
}} ->
RulesStr = [[" ", I] || I <- RuleIds],
Msg = bin([
"Cannot delete bridge while active rules are depending on it:", RulesStr
]),
?BAD_REQUEST(Msg);
Msg0 = ?ERROR_MSG(
'BAD_REQUEST',
bin("Cannot delete bridge while active rules are depending on it")
),
Msg = Msg0#{rules => RuleIds},
{400, Msg};
{error, timeout} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} ->

View File

@ -91,12 +91,15 @@ paths() ->
"/action_types"
].
error_schema(Code, Message) when is_atom(Code) ->
error_schema([Code], Message);
error_schema(Codes, Message) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message));
error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
emqx_dashboard_swagger:error_codes(Codes, Message).
error_schema(Code, Message) ->
error_schema(Code, Message, _ExtraFields = []).
error_schema(Code, Message, ExtraFields) when is_atom(Code) ->
error_schema([Code], Message, ExtraFields);
error_schema(Codes, Message, ExtraFields) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message), ExtraFields);
error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(Message) ->
ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
@ -247,7 +250,8 @@ schema("/actions/:id") ->
204 => <<"Bridge deleted">>,
400 => error_schema(
'BAD_REQUEST',
"Cannot delete bridge while active rules are defined for this bridge"
"Cannot delete bridge while active rules are defined for this bridge",
[{rules, mk(array(string()), #{desc => "Dependent Rule IDs"})}]
),
404 => error_schema('NOT_FOUND', "Bridge not found"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
@ -445,15 +449,12 @@ schema("/action_types") ->
reason := rules_depending_on_this_bridge,
rule_ids := RuleIds
}} ->
RuleIdLists = [binary_to_list(iolist_to_binary(X)) || X <- RuleIds],
RulesStr = string:join(RuleIdLists, ", "),
Msg = io_lib:format(
"Cannot delete bridge while active rules are depending on it: ~s\n"
"Append ?also_delete_dep_actions=true to the request URL to delete "
"rule actions that depend on this bridge as well.",
[RulesStr]
Msg0 = ?ERROR_MSG(
'BAD_REQUEST',
bin("Cannot delete action while active rules are depending on it")
),
?BAD_REQUEST(iolist_to_binary(Msg));
Msg = Msg0#{rules => RuleIds},
{400, Msg};
{error, timeout} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} ->
@ -550,8 +551,8 @@ schema("/action_types") ->
'/action_types'(get, _Request) ->
?OK(emqx_bridge_v2_schema:types()).
maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of
maybe_deobfuscate_bridge_probe(#{<<"type">> := ActionType, <<"name">> := BridgeName} = Params) ->
case emqx_bridge_v2:lookup(ActionType, BridgeName) of
{ok, #{raw_config := RawConf}} ->
%% TODO check if RawConf optained above is compatible with the commented out code below
%% RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
@ -798,11 +799,12 @@ format_resource(
name := Name,
status := Status,
error := Error,
raw_config := RawConf,
raw_config := RawConf0,
resource_data := _ResourceData
},
Node
) ->
RawConf = fill_defaults(Type, RawConf0),
redact(
maps:merge(
RawConf#{
@ -933,6 +935,20 @@ aggregate_metrics(
M17 + N17
).
fill_defaults(Type, RawConf) ->
PackedConf = pack_bridge_conf(Type, RawConf),
FullConf = emqx_config:fill_defaults(emqx_bridge_v2_schema, PackedConf, #{}),
unpack_bridge_conf(Type, FullConf).
pack_bridge_conf(Type, RawConf) ->
#{<<"actions">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
unpack_bridge_conf(Type, PackedConf) ->
TypeBin = bin(Type),
#{<<"actions">> := Bridges} = PackedConf,
#{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
RawConf.
format_bridge_status_and_error(Data) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], Data)).

View File

@ -35,6 +35,9 @@
metrics_fields/0
]).
%% for testing only
-export([enterprise_api_schemas/1]).
%%======================================================================================
%% Hocon Schema Definitions
@ -57,7 +60,7 @@ api_schema(Method) ->
{<<"mqtt">>, emqx_bridge_mqtt_schema}
]
],
EE = enterprise_api_schemas(Method),
EE = ?MODULE:enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(Broker ++ EE)).
bridge_api_union(Refs) ->

View File

@ -18,7 +18,10 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-import(hoconsc, [mk/2, ref/2]).

View File

@ -621,9 +621,10 @@ t_check_dependent_actions_on_delete(Config) ->
Config
),
%% deleting the bridge should fail because there is a rule that depends on it
{ok, 400, _} = request(
{ok, 400, Body} = request(
delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", Config
),
?assertMatch(#{<<"rules">> := [_ | _]}, emqx_utils_json:decode(Body, [return_maps])),
%% delete the rule first
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config),
%% then delete the bridge is OK

View File

@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/asserts.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
@ -108,6 +109,14 @@ setup_mocks() ->
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end
),
catch meck:new(emqx_bridge_schema, MeckOpts),
meck:expect(
emqx_bridge_schema,
enterprise_api_schemas,
1,
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end
),
ok.
con_mod() ->
@ -142,7 +151,9 @@ con_schema() ->
fields("connector") ->
[
{enable, hoconsc:mk(any(), #{})},
{password, emqx_schema_secret:mk(#{required => false})},
{resource_opts, hoconsc:mk(map(), #{})},
{on_start_fun, hoconsc:mk(binary(), #{})},
{ssl, hoconsc:ref(ssl)}
];
fields("api_post") ->
@ -458,6 +469,29 @@ enable_rule_http(RuleId) ->
Params = #{<<"enable">> => true},
update_rule_http(RuleId, Params).
probe_bridge_http_api_v1(Opts) ->
Name = maps:get(name, Opts),
Overrides = maps:get(overrides, Opts, #{}),
BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
BridgeConfig = maps:without([<<"connector">>], BridgeConfig0),
Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
ct:pal("probe bridge (http v1) (~p):\n ~p", [#{name => Name}, Params]),
Res = request(post, Path, Params),
ct:pal("probe bridge (http v1) (~p) result:\n ~p", [#{name => Name}, Res]),
Res.
probe_action_http_api_v2(Opts) ->
Name = maps:get(name, Opts),
Overrides = maps:get(overrides, Opts, #{}),
BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["actions_probe"]),
ct:pal("probe action (http v2) (~p):\n ~p", [#{name => Name}, Params]),
Res = request(post, Path, Params),
ct:pal("probe action (http v2) (~p) result:\n ~p", [#{name => Name}, Res]),
Res.
%%------------------------------------------------------------------------------
%% Test cases
%%------------------------------------------------------------------------------
@ -825,3 +859,63 @@ t_create_with_bad_name(_Config) ->
}
}}} = create_bridge_http_api_v1(Opts),
ok.
t_obfuscated_secrets_probe(_Config) ->
Name = <<"bridgev2">>,
Me = self(),
ets:new(emqx_bridge_v2_SUITE:fun_table_name(), [named_table, public]),
OnStartFun = emqx_bridge_v2_SUITE:wrap_fun(fun(Conf) ->
Me ! {on_start, Conf},
{ok, Conf}
end),
OriginalPassword = <<"supersecret">>,
Overrides = #{<<"password">> => OriginalPassword, <<"on_start_fun">> => OnStartFun},
%% Using the real password, like when creating the bridge for the first time.
?assertMatch(
{ok, {{_, 204, _}, _, _}},
probe_bridge_http_api_v1(#{name => Name, overrides => Overrides})
),
%% Check that we still can probe created bridges that use passwords.
?assertMatch(
{ok, {{_, 201, _}, _, #{}}},
create_bridge_http_api_v1(#{name => Name, overrides => Overrides})
),
%% Password is obfuscated
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"password">> := <<"******">>}}},
get_bridge_http_api_v1(Name)
),
%% still using the password
?assertMatch(
{ok, {{_, 204, _}, _, _}},
probe_bridge_http_api_v1(#{name => Name, overrides => Overrides})
),
%% now with obfuscated password (loading the UI again)
?assertMatch(
{ok, {{_, 204, _}, _, _}},
probe_bridge_http_api_v1(#{
name => Name,
overrides => Overrides#{<<"password">> => <<"******">>}
})
),
?assertMatch(
{ok, {{_, 204, _}, _, _}},
probe_action_http_api_v2(#{
name => Name,
overrides => Overrides#{<<"password">> => <<"******">>}
})
),
%% We have to check that the connector was started with real passwords during dry runs
StartConfs = [Conf || {on_start, Conf} <- ?drainMailbox()],
Passwords = lists:map(fun(#{password := P}) -> P end, StartConfs),
?assert(lists:all(fun is_function/1, Passwords), #{passwords => Passwords}),
UnwrappedPasswords = [F() || F <- Passwords],
?assertEqual(
[OriginalPassword],
lists:usort(UnwrappedPasswords),
#{passwords => UnwrappedPasswords}
),
ok.

View File

@ -304,7 +304,7 @@ t_bridges_lifecycle(Config) ->
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"connector">> := ?CONNECTOR_NAME,
<<"kafka">> := #{},
<<"parameters">> := #{},
<<"local_topic">> := _,
<<"resource_opts">> := _
}},
@ -958,11 +958,12 @@ t_cascade_delete_actions(Config) ->
},
Config
),
{ok, 400, _} = request(
{ok, 400, Body} = request(
delete,
uri([?ROOT, BridgeID]),
Config
),
?assertMatch(#{<<"rules">> := [_ | _]}, emqx_utils_json:decode(Body, [return_maps])),
{ok, 200, [_]} = request_json(get, uri([?ROOT]), Config),
%% Cleanup
{ok, 204, _} = request(
@ -1137,6 +1138,19 @@ t_cluster_later_join_metrics(Config) ->
),
ok.
t_raw_config_response_defaults(Config) ->
Params = maps:remove(<<"enable">>, ?KAFKA_BRIDGE(?BRIDGE_NAME)),
?assertMatch(
{ok, 201, #{<<"enable">> := true}},
request_json(
post,
uri([?ROOT]),
Params,
Config
)
),
ok.
%%% helpers
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

View File

@ -25,8 +25,8 @@
non_deprecated_fields(Fields) ->
[K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)].
find_resource_opts_fields(SchemaMod, FieldName) ->
Fields = hocon_schema:fields(SchemaMod, FieldName),
find_resource_opts_fields(SchemaMod, StructName) ->
Fields = hocon_schema:fields(SchemaMod, StructName),
case lists:keyfind(resource_opts, 1, Fields) of
false ->
undefined;
@ -35,8 +35,8 @@ find_resource_opts_fields(SchemaMod, FieldName) ->
end.
get_resource_opts_subfields(Sc) ->
?R_REF(SchemaModRO, FieldNameRO) = hocon_schema:field_schema(Sc, type),
ROFields = non_deprecated_fields(hocon_schema:fields(SchemaModRO, FieldNameRO)),
?R_REF(SchemaModRO, StructNameRO) = hocon_schema:field_schema(Sc, type),
ROFields = non_deprecated_fields(hocon_schema:fields(SchemaModRO, StructNameRO)),
proplists:get_keys(ROFields).
%%------------------------------------------------------------------------------
@ -107,3 +107,33 @@ connector_resource_opts_test() ->
}
),
ok.
actions_api_spec_post_fields_test() ->
?UNION(Union) = emqx_bridge_v2_schema:post_request(),
Schemas =
lists:map(
fun(?R_REF(SchemaMod, StructName)) ->
{SchemaMod, hocon_schema:fields(SchemaMod, StructName)}
end,
hoconsc:union_members(Union)
),
MinimalFields0 =
[
binary_to_atom(F)
|| F <- emqx_bridge_v2_schema:top_level_common_action_keys(),
F =/= <<"local_topic">>
],
MinimalFields = [type, name | MinimalFields0],
MissingFields =
lists:filtermap(
fun({SchemaMod, FieldSchemas}) ->
Missing = MinimalFields -- proplists:get_keys(FieldSchemas),
case Missing of
[] -> false;
_ -> {true, {SchemaMod, Missing}}
end
end,
Schemas
),
?assertEqual(#{}, maps:from_list(MissingFields)),
ok.

View File

@ -88,7 +88,7 @@ keyspace(_) -> undefined.
callback_mode() -> async_if_possible.
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,
#{

View File

@ -27,6 +27,7 @@
-type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json().
-type project_id() :: binary().
-type duration() :: non_neg_integer().
-type config() :: #{
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
@ -35,12 +36,12 @@
any() => term()
}.
-opaque state() :: #{
connect_timeout := timer:time(),
connect_timeout := duration(),
jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(),
pool_name := binary(),
project_id := project_id(),
request_ttl := infinity | timer:time()
request_ttl := erlang:timeout()
}.
-type headers() :: [{binary(), iodata()}].
-type body() :: iodata().
@ -414,7 +415,7 @@ reply_delegator(ResourceId, ReplyFunAndArgs, Response) ->
Result = handle_response(Response, ResourceId, _QueryMode = async),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
-spec do_get_status(resource_id(), timer:time()) -> boolean().
-spec do_get_status(resource_id(), duration()) -> boolean().
do_get_status(ResourceId, Timeout) ->
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ResourceId)],
DoPerWorker =

View File

@ -30,12 +30,13 @@
-type bridge_name() :: atom() | binary().
-type ack_id() :: binary().
-type message_id() :: binary().
-type duration() :: non_neg_integer().
-type config() :: #{
ack_deadline := emqx_schema:timeout_duration_s(),
ack_retry_interval := emqx_schema:timeout_duration_ms(),
client := emqx_bridge_gcp_pubsub_client:state(),
ecpool_worker_id => non_neg_integer(),
forget_interval := timer:time(),
forget_interval := duration(),
hookpoint := binary(),
instance_id := binary(),
mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
@ -52,7 +53,7 @@
async_workers := #{pid() => reference()},
client := emqx_bridge_gcp_pubsub_client:state(),
ecpool_worker_id := non_neg_integer(),
forget_interval := timer:time(),
forget_interval := duration(),
hookpoint := binary(),
instance_id := binary(),
mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),

View File

@ -70,7 +70,7 @@ query_mode(_Config) -> async.
on_start(InstanceId, Config0) ->
?SLOG(info, #{
msg => "starting_gcp_pubsub_bridge",
config => Config0
instance_id => InstanceId
}),
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
#{service_account_json := #{<<"project_id">> := ProjectId}} = Config,

View File

@ -274,16 +274,17 @@ t_write_failure(Config) ->
health_check_resource_down(Config),
case QueryMode of
sync ->
case EnableBatch of
true ->
%% append to batch always returns ok
?assertMatch(ok, send_message(Config, Data));
false ->
?assertMatch(
{error, {cannot_list_shards, {<<?STREAM>>, econnrefused}}},
send_message(Config, Data)
)
end;
%% Error (call timeout) is expected for both with_batch and without_batch.
%% `health_check_resource_down(Config)` above calls health check and asserts
%% that resource is already down.
%% So, emqx_resource_manager updates it state to disconnected before returning health_check result.
%% After that, emqx_resource_buffer_worker reads resource state and doesn't even attempt calling
%% hstreamdb connector, since it is disconnected, see: emqx_resource_buffer_worker.erl:1163:
%% ```
%% do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
%% ?RESOURCE_ERROR(not_connected, "resource not connected").
%% ```
?assertMatch({error, _}, send_message(Config, Data));
async ->
%% TODO: async mode is not supported yet,
%% but it will return ok if calling emqx_resource_buffer_worker:async_query/3,

View File

@ -23,6 +23,7 @@
]).
-define(CONNECTOR_TYPE, matrix).
-define(ACTION_TYPE, matrix).
%% -------------------------------------------------------------------------------------------------
%% api
@ -44,7 +45,7 @@ namespace() -> "bridge_matrix".
roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", matrix);
emqx_bridge_pgsql:fields("post", ?ACTION_TYPE, "config");
fields("config_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields(action) ->
@ -61,7 +62,7 @@ fields("put_bridge_v2") ->
fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
emqx_bridge_pgsql:fields("post", ?ACTION_TYPE, pgsql_action);
fields(Field) when
Field == "get_connector";
Field == "put_connector";

View File

@ -13,13 +13,15 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1,
fields/2
desc/1
]).
%% for sharing with other actions
-export([fields/3]).
%% Examples
-export([
@ -33,9 +35,7 @@
values_conn_bridge_examples/2
]).
-define(PGSQL_HOST_OPTIONS, #{
default_port => ?PGSQL_DEFAULT_PORT
}).
-define(ACTION_TYPE, pgsql).
%% Hocon Schema Definitions
namespace() -> "bridge_pgsql".
@ -81,7 +81,7 @@ fields("put_bridge_v2") ->
fields("get_bridge_v2") ->
fields(pgsql_action);
fields("post_bridge_v2") ->
fields(pgsql_action);
fields("post", pgsql, pgsql_action);
fields("config") ->
[
{enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -99,14 +99,14 @@ fields("config") ->
(emqx_postgresql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("post") ->
fields("post", pgsql);
fields("post", ?ACTION_TYPE, "config");
fields("put") ->
fields("config");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post").
fields("post", Type) ->
[type_field(Type), name_field() | fields("config")].
fields("post", Type, StructName) ->
[type_field(Type), name_field() | fields(StructName)].
type_field(Type) ->
{type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.

View File

@ -604,7 +604,11 @@ t_missing_data(Config) ->
#{
result :=
{error,
{unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
{unrecoverable_error, #{
error_code := <<"23502">>,
error_codename := not_null_violation,
severity := error
}}}
},
Event
),

View File

@ -25,9 +25,11 @@
-include("emqx_bridge_syskeeper.hrl").
-type duration() :: non_neg_integer().
-type state() :: #{
ack_mode := need_ack | no_ack,
ack_timeout := timer:time(),
ack_timeout := duration(),
socket := undefined | inet:socket(),
frame_state := emqx_bridge_syskeeper_frame:state(),
last_error := undefined | tuple()

View File

@ -23,6 +23,7 @@
]).
-define(CONNECTOR_TYPE, timescale).
-define(ACTION_TYPE, timescale).
%% -------------------------------------------------------------------------------------------------
%% api
@ -44,7 +45,7 @@ namespace() -> "bridge_timescale".
roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", timescale);
emqx_bridge_pgsql:fields("post", ?ACTION_TYPE, "config");
fields("config_connector") ->
emqx_postgresql_connector_schema:fields("config_connector");
fields(action) ->
@ -61,7 +62,7 @@ fields("put_bridge_v2") ->
fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
emqx_bridge_pgsql:fields("post", ?ACTION_TYPE, pgsql_action);
fields(Field) when
Field == "get_connector";
Field == "put_connector";

View File

@ -468,7 +468,7 @@ fill_defaults(Conf) ->
Conf1 = emqx_config:fill_defaults(Conf),
filter_cluster_conf(Conf1).
-define(ALL_STRATEGY, [<<"manual">>, <<"static">>, <<"dns">>, <<"etcd">>, <<"k8s">>, <<"mcast">>]).
-define(ALL_STRATEGY, [<<"manual">>, <<"static">>, <<"dns">>, <<"etcd">>, <<"k8s">>]).
filter_cluster_conf(#{<<"cluster">> := #{<<"discovery_strategy">> := Strategy} = Cluster} = Conf) ->
Cluster1 = maps:without(lists:delete(Strategy, ?ALL_STRATEGY), Cluster),

View File

@ -154,7 +154,7 @@ fields("cluster") ->
)},
{"discovery_strategy",
sc(
hoconsc:enum([manual, static, dns, etcd, k8s, mcast]),
hoconsc:enum([manual, static, dns, etcd, k8s]),
#{
default => manual,
desc => ?DESC(cluster_discovery_strategy),
@ -208,11 +208,6 @@ fields("cluster") ->
?R_REF(cluster_static),
#{}
)},
{"mcast",
sc(
?R_REF(cluster_mcast),
#{importance => ?IMPORTANCE_HIDDEN}
)},
{"dns",
sc(
?R_REF(cluster_dns),
@ -251,81 +246,6 @@ fields(cluster_static) ->
}
)}
];
fields(cluster_mcast) ->
[
{"addr",
sc(
string(),
#{
default => <<"239.192.0.1">>,
desc => ?DESC(cluster_mcast_addr),
'readOnly' => true
}
)},
{"ports",
sc(
hoconsc:array(integer()),
#{
default => [4369, 4370],
'readOnly' => true,
desc => ?DESC(cluster_mcast_ports)
}
)},
{"iface",
sc(
string(),
#{
default => <<"0.0.0.0">>,
desc => ?DESC(cluster_mcast_iface),
'readOnly' => true
}
)},
{"ttl",
sc(
range(0, 255),
#{
default => 255,
desc => ?DESC(cluster_mcast_ttl),
'readOnly' => true
}
)},
{"loop",
sc(
boolean(),
#{
default => true,
desc => ?DESC(cluster_mcast_loop),
'readOnly' => true
}
)},
{"sndbuf",
sc(
emqx_schema:bytesize(),
#{
default => <<"16KB">>,
desc => ?DESC(cluster_mcast_sndbuf),
'readOnly' => true
}
)},
{"recbuf",
sc(
emqx_schema:bytesize(),
#{
default => <<"16KB">>,
desc => ?DESC(cluster_mcast_recbuf),
'readOnly' => true
}
)},
{"buffer",
sc(
emqx_schema:bytesize(),
#{
default => <<"32KB">>,
desc => ?DESC(cluster_mcast_buffer),
'readOnly' => true
}
)}
];
fields(cluster_dns) ->
[
{"name",
@ -1100,8 +1020,6 @@ desc("cluster") ->
?DESC("desc_cluster");
desc(cluster_static) ->
?DESC("desc_cluster_static");
desc(cluster_mcast) ->
?DESC("desc_cluster_mcast");
desc(cluster_dns) ->
?DESC("desc_cluster_dns");
desc(cluster_etcd) ->
@ -1423,17 +1341,6 @@ map(Name, Type) -> hoconsc:map(Name, Type).
cluster_options(static, Conf) ->
[{seeds, conf_get("cluster.static.seeds", Conf, [])}];
cluster_options(mcast, Conf) ->
{ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)),
{ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)),
Ports = conf_get("cluster.mcast.ports", Conf),
[
{addr, Addr},
{ports, Ports},
{iface, Iface},
{ttl, conf_get("cluster.mcast.ttl", Conf, 1)},
{loop, conf_get("cluster.mcast.loop", Conf, true)}
];
cluster_options(dns, Conf) ->
[
{name, conf_get("cluster.dns.name", Conf)},
@ -1520,7 +1427,8 @@ ensure_file_handlers(Conf, _Opts) ->
convert_rotation(undefined, _Opts) -> undefined;
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
convert_rotation(Count, _Opts) when is_integer(Count) -> Count;
convert_rotation(Count, _Opts) -> throw({"bad_rotation", Count}).
ensure_unicode_path(undefined, _) ->
undefined;

View File

@ -610,11 +610,12 @@ format_resource(
#{
type := Type,
name := ConnectorName,
raw_config := RawConf,
raw_config := RawConf0,
resource_data := ResourceData
},
Node
) ->
RawConf = fill_defaults(Type, RawConf0),
redact(
maps:merge(
RawConf#{
@ -638,6 +639,20 @@ format_resource_data(added_channels, Channels, Result) ->
format_resource_data(K, V, Result) ->
Result#{K => V}.
fill_defaults(Type, RawConf) ->
PackedConf = pack_connector_conf(Type, RawConf),
FullConf = emqx_config:fill_defaults(emqx_connector_schema, PackedConf, #{}),
unpack_connector_conf(Type, FullConf).
pack_connector_conf(Type, RawConf) ->
#{<<"connectors">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
unpack_connector_conf(Type, PackedConf) ->
TypeBin = bin(Type),
#{<<"connectors">> := Bridges} = PackedConf,
#{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
RawConf.
format_action(ActionId) ->
element(2, emqx_bridge_v2:parse_id(ActionId)).

View File

@ -33,8 +33,9 @@
-type jwt() :: binary().
-type wrapped_jwk() :: fun(() -> jose_jwk:key()).
-type jwk() :: jose_jwk:key().
-type duration() :: non_neg_integer().
-type jwt_config() :: #{
expiration := timer:time(),
expiration := duration(),
resource_id := resource_id(),
table := ets:table(),
jwk := wrapped_jwk() | jwk(),

View File

@ -41,10 +41,12 @@
-include_lib("jose/include/jose_jwk.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-type duration() :: non_neg_integer().
-type config() :: #{
private_key := binary(),
resource_id := resource_id(),
expiration := timer:time(),
expiration := duration(),
table := ets:table(),
iss := binary(),
sub := binary(),
@ -54,9 +56,9 @@
}.
-type jwt() :: binary().
-type state() :: #{
refresh_timer := undefined | timer:tref() | reference(),
refresh_timer := undefined | reference(),
resource_id := resource_id(),
expiration := timer:time(),
expiration := duration(),
table := ets:table(),
jwt := undefined | jwt(),
%% only undefined during startup
@ -221,7 +223,7 @@ censor_secret(undefined) ->
censor_secret(_Secret) ->
"******".
-spec cancel_timer(undefined | timer:tref() | reference()) -> ok.
-spec cancel_timer(undefined | reference() | reference()) -> ok.
cancel_timer(undefined) ->
ok;
cancel_timer(TRef) ->

View File

@ -18,7 +18,10 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-import(hoconsc, [mk/2, ref/2]).
@ -557,6 +560,12 @@ to_bin(Bin) when is_binary(Bin) ->
to_bin(Something) ->
Something.
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
status() ->
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
-ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() ->
@ -591,12 +600,6 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
}}
end.
status() ->
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
common_field_names() ->
[
enable, description

View File

@ -766,7 +766,7 @@ t_actions_field(Config) ->
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"connector">> := Name,
<<"kafka">> := #{},
<<"parameters">> := #{},
<<"local_topic">> := _,
<<"resource_opts">> := _
}},
@ -821,7 +821,7 @@ t_fail_delete_with_action(Config) ->
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"connector">> := Name,
<<"kafka">> := #{},
<<"parameters">> := #{},
<<"local_topic">> := _,
<<"resource_opts">> := _
}},
@ -845,6 +845,19 @@ t_fail_delete_with_action(Config) ->
),
ok.
t_raw_config_response_defaults(Config) ->
Params = maps:without([<<"enable">>, <<"resource_opts">>], ?KAFKA_CONNECTOR(?CONNECTOR_NAME)),
?assertMatch(
{ok, 201, #{<<"enable">> := true, <<"resource_opts">> := #{}}},
request_json(
post,
uri(["connectors"]),
Params,
Config
)
),
ok.
%%% helpers
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

View File

@ -1,6 +1,6 @@
{application, emqx_ctl, [
{description, "Backend for emqx_ctl script"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{mod, {emqx_ctl_app, []}},
{applications, [

View File

@ -339,11 +339,25 @@ audit_log(Level, From, Log) ->
try
apply(Mod, Fun, [Level, From, normalize_audit_log_args(Log)])
catch
_:{aborted, {no_exists, emqx_audit}} ->
case Log of
#{cmd := cluster, args := ["leave"]} ->
ok;
_ ->
?LOG_ERROR(#{
msg => "ctl_command_crashed",
reason => "emqx_audit table not found",
log => normalize_audit_log_args(Log),
from => From
})
end;
_:Reason:Stacktrace ->
?LOG_ERROR(#{
msg => "ctl_command_crashed",
stacktrace => Stacktrace,
reason => Reason
reason => Reason,
log => normalize_audit_log_args(Log),
from => From
})
end
end.

View File

@ -114,7 +114,7 @@ add_user(Username, Password, Role, Desc) when is_binary(Username), is_binary(Pas
end.
do_add_user(Username, Password, Role, Desc) ->
Res = mria:transaction(?DASHBOARD_SHARD, fun add_user_/4, [Username, Password, Role, Desc]),
Res = mria:sync_transaction(?DASHBOARD_SHARD, fun add_user_/4, [Username, Password, Role, Desc]),
return(Res).
%% 0-9 or A-Z or a-z or $_
@ -191,7 +191,7 @@ force_add_user(Username, Password, Role, Desc) ->
description = Desc
})
end,
case mria:transaction(?DASHBOARD_SHARD, AddFun) of
case mria:sync_transaction(?DASHBOARD_SHARD, AddFun) of
{atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason}
end.
@ -227,7 +227,7 @@ remove_user(Username) ->
_ -> mnesia:delete({?ADMIN, Username})
end
end,
case return(mria:transaction(?DASHBOARD_SHARD, Trans)) of
case return(mria:sync_transaction(?DASHBOARD_SHARD, Trans)) of
{ok, Result} ->
_ = emqx_dashboard_token:destroy_by_username(Username),
{ok, Result};
@ -242,7 +242,11 @@ update_user(Username, Role, Desc) ->
ok ->
case
return(
mria:transaction(?DASHBOARD_SHARD, fun update_user_/3, [Username, Role, Desc])
mria:sync_transaction(
?DASHBOARD_SHARD,
fun update_user_/3,
[Username, Role, Desc]
)
)
of
{ok, {true, Result}} ->
@ -324,7 +328,7 @@ update_pwd(Username, Fun) ->
end,
mnesia:write(Fun(User))
end,
return(mria:transaction(?DASHBOARD_SHARD, Trans)).
return(mria:sync_transaction(?DASHBOARD_SHARD, Trans)).
-spec lookup_user(dashboard_username()) -> [emqx_admin()].
lookup_user(Username) ->

View File

@ -46,6 +46,7 @@ log_meta(Meta, Req) ->
true ->
undefined;
false ->
Code = maps:get(code, Meta),
Meta1 = #{
time => logger:timestamp(),
from => from(Meta),
@ -56,8 +57,8 @@ log_meta(Meta, Req) ->
%% method for http filter api.
http_method => Method,
http_request => http_request(Meta),
http_status_code => maps:get(code, Meta),
operation_result => operation_result(Meta),
http_status_code => Code,
operation_result => operation_result(Code, Meta),
node => node()
},
Meta2 = maps:without([req_start, req_end, method, headers, body, bindings, code], Meta),
@ -105,8 +106,9 @@ operation_type(Meta) ->
http_request(Meta) ->
maps:with([method, headers, bindings, body], Meta).
operation_result(#{failure := _}) -> failure;
operation_result(_) -> success.
operation_result(Code, _) when Code >= 300 -> failure;
operation_result(_, #{failure := _}) -> failure;
operation_result(_, _) -> success.
level(get, _Code) -> debug;
level(_, Code) when Code >= 200 andalso Code < 300 -> info;

View File

@ -119,7 +119,7 @@ do_sign(#?ADMIN{username = Username} = User, Password) ->
{_, Token} = jose_jws:compact(Signed),
Role = emqx_dashboard_admin:role(User),
JWTRec = format(Token, Username, Role, ExpTime),
_ = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
_ = mria:sync_transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
{ok, Role, Token}.
-spec do_verify(_, Token :: binary()) ->
@ -141,7 +141,7 @@ do_verify(Req, Token) ->
do_destroy(Token) ->
Fun = fun mnesia:delete/1,
{atomic, ok} = mria:transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]),
{atomic, ok} = mria:sync_transaction(?DASHBOARD_SHARD, Fun, [{?TAB, Token}]),
ok.
do_destroy_by_username(Username) ->
@ -266,7 +266,7 @@ check_rbac(_Req, JWT) ->
save_new_jwt(OldJWT) ->
#?ADMIN_JWT{exptime = _ExpTime, extra = _Extra, username = Username} = OldJWT,
NewJWT = OldJWT#?ADMIN_JWT{exptime = jwt_expiration_time()},
{atomic, Res} = mria:transaction(
{atomic, Res} = mria:sync_transaction(
?DASHBOARD_SHARD,
fun mnesia:write/1,
[NewJWT]

View File

@ -30,6 +30,6 @@ evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) ->
rpc:call(Node, emqx_eviction_agent, evict_session_channel, [ClientId, ConnInfo, ClientInfo]).
%% Introduced in v2:
-spec all_channels_count([node()], time:time()) -> emqx_rpc:erpc_multicall(non_neg_integer()).
-spec all_channels_count([node()], timeout()) -> emqx_rpc:erpc_multicall(non_neg_integer()).
all_channels_count(Nodes, Timeout) ->
erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout).

View File

@ -20,6 +20,8 @@
%% module if it integrated with emqx_gateway_conn module
-module(emqx_gateway_channel).
-export_type([gen_server_from/0]).
-type channel() :: any().
%%--------------------------------------------------------------------

View File

@ -16,6 +16,8 @@
-module(emqx_gateway_impl).
-export_type([state/0]).
-include("emqx_gateway.hrl").
-type state() :: map().

View File

@ -96,8 +96,6 @@
-record(state, {
%% Gateway Name
gwname :: gateway_name(),
%% ClientId Locker for CM
locker :: pid(),
%% ClientId Registry server
registry :: pid(),
chan_pmon :: emqx_pmon:pmon()
@ -776,7 +774,7 @@ init(Options) ->
{ok, Registry} = emqx_gateway_cm_registry:start_link(GwName),
%% Start locker process
{ok, Locker} = ekka_locker:start_link(lockername(GwName)),
{ok, _LockerPid} = ekka_locker:start_link(lockername(GwName)),
%% Interval update stats
%% TODO: v0.2
@ -784,7 +782,6 @@ init(Options) ->
{ok, #state{
gwname = GwName,
locker = Locker,
registry = Registry,
chan_pmon = emqx_pmon:new()
}}.
@ -812,9 +809,9 @@ handle_info(
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{registry = Registry, locker = Locker}) ->
terminate(_Reason, #state{registry = Registry, gwname = GwName}) ->
_ = gen_server:stop(Registry),
_ = ekka_locker:stop(Locker),
_ = ekka_locker:stop(lockername(GwName)),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -53,7 +53,7 @@
-record(channel, {chid, pid}).
%% @doc Start the global channel registry for the given gateway name.
-spec start_link(gateway_name()) -> gen_server:startlink_ret().
-spec start_link(gateway_name()) -> emqx_types:startlink_ret().
start_link(Name) ->
gen_server:start_link(?MODULE, [Name], []).

View File

@ -17,6 +17,8 @@
%% @doc The gateway instance context
-module(emqx_gateway_ctx).
-export_type([context/0]).
-include("emqx_gateway.hrl").
%% @doc The running context for a Connection/Channel process.

View File

@ -89,17 +89,17 @@
-elvis([{elvis_style, god_modules, disable}]).
-spec childspec(supervisor:worker(), Mod :: atom()) ->
-spec childspec(worker | supervisor, Mod :: atom()) ->
supervisor:child_spec().
childspec(Type, Mod) ->
childspec(Mod, Type, Mod, []).
-spec childspec(supervisor:worker(), Mod :: atom(), Args :: list()) ->
-spec childspec(worker | supervisor, Mod :: atom(), Args :: list()) ->
supervisor:child_spec().
childspec(Type, Mod, Args) ->
childspec(Mod, Type, Mod, Args).
-spec childspec(atom(), supervisor:worker(), Mod :: atom(), Args :: list()) ->
-spec childspec(atom(), worker | supervisor, Mod :: atom(), Args :: list()) ->
supervisor:child_spec().
childspec(Id, Type, Mod, Args) ->
#{
@ -121,7 +121,7 @@ supervisor_ret({error, {Reason, Child}}) ->
supervisor_ret(Ret) ->
Ret.
-spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id()) ->
-spec find_sup_child(Sup :: pid() | atom(), ChildId :: term()) ->
false
| {ok, pid()}.
find_sup_child(Sup, ChildId) ->

View File

@ -40,7 +40,7 @@ introduced_in() ->
"5.0.0".
-spec lookup_by_clientid([node()], emqx_gateway_cm:gateway_name(), emqx_types:clientid()) ->
emqx_rpc:multicall_return([pid()]).
emqx_rpc:multicall_result([pid()]).
lookup_by_clientid(Nodes, GwName, ClientId) ->
rpc:multicall(Nodes, emqx_gateway_cm, do_lookup_by_clientid, [GwName, ClientId]).

View File

@ -50,11 +50,11 @@ init_per_suite(Conf) ->
emqx_config:delete_override_conf_files(),
emqx_config:erase(gateway),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_auth, emqx_auth_mnesia, emqx_gateway]),
emqx_mgmt_api_test_util:init_suite([grpc, emqx_conf, emqx_auth, emqx_auth_mnesia, emqx_gateway]),
Conf.
end_per_suite(Conf) ->
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_auth_mnesia, emqx_auth, emqx_conf]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_auth_mnesia, emqx_auth, emqx_conf, grpc]),
Conf.
init_per_testcase(t_gateway_fail, Config) ->

View File

@ -69,7 +69,7 @@ init_per_suite(Config) ->
emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_config:erase(gateway),
init_gateway_conf(),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway]),
emqx_mgmt_api_test_util:init_suite([grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway]),
application:ensure_all_started(cowboy),
emqx_gateway_auth_ct:start(),
timer:sleep(500),
@ -78,7 +78,9 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
emqx_gateway_auth_ct:stop(),
emqx_config:erase(gateway),
emqx_mgmt_api_test_util:end_suite([cowboy, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway]),
emqx_mgmt_api_test_util:end_suite([
cowboy, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway, grpc
]),
Config.
init_per_testcase(_Case, Config) ->

View File

@ -69,7 +69,7 @@ init_per_suite(Config) ->
emqx_gateway_test_utils:load_all_gateway_apps(),
init_gateway_conf(),
emqx_mgmt_api_test_util:init_suite([
emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway
grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway
]),
meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_authz_file, create, fun(S) -> S end),
@ -83,7 +83,7 @@ end_per_suite(Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
emqx_config:erase(gateway),
emqx_mgmt_api_test_util:end_suite([
emqx_gateway, emqx_auth_http, emqx_auth, emqx_conf
emqx_gateway, emqx_auth_http, emqx_auth, emqx_conf, grpc
]),
Config.

View File

@ -40,7 +40,7 @@
token :: token() | undefined,
observe :: 0 | 1 | undefined | observed,
state :: atom(),
timers :: maps:map(),
timers :: map(),
transport :: emqx_coap_transport:transport()
}).
-type state_machine() :: #state_machine{}.

View File

@ -131,7 +131,7 @@ stats(#channel{subscriptions = Subs}) ->
%% Init the channel
%%--------------------------------------------------------------------
-spec init(emqx_exproto_types:conninfo(), map()) -> channel().
-spec init(emqx_types:conninfo(), map()) -> channel().
init(
ConnInfo = #{
socktype := Socktype,

View File

@ -29,6 +29,8 @@
is_empty/1
]).
-export_type([grpc_client_state/0]).
-define(CONN_HANDLER_MOD, emqx_exproto_v_1_connection_handler_client).
-define(CONN_UNARY_HANDLER_MOD, emqx_exproto_v_1_connection_unary_handler_client).

View File

@ -38,13 +38,18 @@
unsubscribe/2
]).
%% TODO:
%% The spec should be :: grpc_cowboy_h:error_response()
%% But there is no such module as grpc_cowboy_h
-type error_response() :: term().
%%--------------------------------------------------------------------
%% gRPC ConnectionAdapter service
%%--------------------------------------------------------------------
-spec send(emqx_exproto_pb:send_bytes_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
| {error, error_response()}.
send(Req = #{conn := Conn, bytes := Bytes}, Md) ->
?SLOG(debug, #{
msg => "recv_grpc_function_call",
@ -55,7 +60,7 @@ send(Req = #{conn := Conn, bytes := Bytes}, Md) ->
-spec close(emqx_exproto_pb:close_socket_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
| {error, error_response()}.
close(Req = #{conn := Conn}, Md) ->
?SLOG(debug, #{
msg => "recv_grpc_function_call",
@ -66,7 +71,7 @@ close(Req = #{conn := Conn}, Md) ->
-spec authenticate(emqx_exproto_pb:authenticate_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
| {error, error_response()}.
authenticate(
Req = #{
conn := Conn,
@ -89,7 +94,7 @@ authenticate(
-spec start_timer(emqx_exproto_pb:timer_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
| {error, error_response()}.
start_timer(Req = #{conn := Conn, type := Type, interval := Interval}, Md) when
Type =:= 'KEEPALIVE' andalso Interval > 0
->
@ -111,7 +116,7 @@ start_timer(Req, Md) ->
-spec publish(emqx_exproto_pb:publish_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
| {error, error_response()}.
publish(Req = #{conn := Conn, topic := Topic, qos := Qos, payload := Payload}, Md) when
?IS_QOS(Qos)
->
@ -132,7 +137,7 @@ publish(Req, Md) ->
-spec raw_publish(emqx_exproto_pb:raw_publish_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
| {error, error_response()}.
raw_publish(Req = #{topic := Topic, qos := Qos, payload := Payload}, Md) ->
?SLOG(debug, #{
msg => "recv_grpc_function_call",
@ -145,7 +150,7 @@ raw_publish(Req = #{topic := Topic, qos := Qos, payload := Payload}, Md) ->
-spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
| {error, error_response()}.
subscribe(Req = #{conn := Conn, topic := Topic, qos := Qos}, Md) when
?IS_QOS(Qos)
->
@ -165,7 +170,7 @@ subscribe(Req, Md) ->
-spec unsubscribe(emqx_exproto_pb:unsubscribe_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
| {error, error_response()}.
unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) ->
?SLOG(debug, #{
msg => "recv_grpc_function_call",

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_exproto, [
{description, "ExProto Gateway"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
{env, []},

View File

@ -3,7 +3,7 @@
{description, "LwM2M Gateway"},
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},

View File

@ -43,6 +43,8 @@
-define(LWM2M_OBJECT_DEF_TAB, lwm2m_object_def_tab).
-define(LWM2M_OBJECT_NAME_TO_ID_TAB, lwm2m_object_name_to_id_tab).
-type xmlElement() :: tuple().
-record(state, {}).
-elvis([{elvis_style, atom_naming_convention, disable}]).
@ -59,7 +61,7 @@
start_link(XmlDir) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
-spec find_objectid(integer()) -> {error, no_xml_definition} | xmerl:xmlElement().
-spec find_objectid(integer()) -> {error, no_xml_definition} | xmlElement().
find_objectid(ObjectId) ->
ObjectIdInt =
case is_list(ObjectId) of
@ -71,7 +73,7 @@ find_objectid(ObjectId) ->
[{_ObjectId, Xml}] -> Xml
end.
-spec find_name(string()) -> {error, no_xml_definition} | xmerl:xmlElement().
-spec find_name(string()) -> {error, no_xml_definition} | xmlElement().
find_name(Name) ->
NameBinary =
case is_list(Name) of

View File

@ -32,6 +32,8 @@
lookup_topic_id/2
]).
-export_type([registry/0]).
-define(PKEY(Id), {mqttsn, predef_topics, Id}).
-type registry() :: #{

View File

@ -72,13 +72,13 @@
%% Piggyback
piggyback :: single | multiple,
%% Limiter
limiter :: maybe(emqx_limiter:limiter()),
limiter :: maybe(emqx_htb_limiter:limiter()),
%% Limit Timer
limit_timer :: maybe(reference()),
%% Parse State
parse_state :: emqx_ocpp_frame:parse_state(),
%% Serialize options
serialize :: emqx_ocpp_frame:serialize_opts(),
serialize :: emqx_ocpp_frame:serialize_options(),
%% Channel
channel :: emqx_ocpp_channel:channel(),
%% GC State
@ -268,7 +268,7 @@ init_state_and_channel([Req, Opts, _WsOpts], _State = undefined) ->
ws_cookie => WsCookie,
conn_mod => ?MODULE
},
Limiter = undeined,
Limiter = undefined,
ActiveN = emqx_gateway_utils:active_n(Opts),
Piggyback = emqx_utils_maps:deep_get([websocket, piggyback], Opts, multiple),
ParseState = emqx_ocpp_frame:initial_parse_state(#{}),

View File

@ -39,9 +39,12 @@
-export_type([
parse_state/0,
parse_result/0,
frame/0
frame/0,
serialize_options/0
]).
-type serialize_options() :: emqx_gateway_frame:serialize_options().
-dialyzer({nowarn_function, [format/1]}).
-spec initial_parse_state(map()) -> parse_state().
@ -114,7 +117,7 @@ parse(
},
<<>>, Parser}.
-spec serialize_opts() -> emqx_gateway_frame:serialize_options().
-spec serialize_opts() -> serialize_options().
serialize_opts() ->
#{}.

View File

@ -69,7 +69,7 @@
%% Channel State
conn_state :: conn_state(),
%% Heartbeat
heartbeat :: emqx_stomp_heartbeat:heartbeat(),
heartbeat :: undefined | emqx_stomp_heartbeat:heartbeat(),
%% Subscriptions
subscriptions = [],
%% Timer

View File

@ -27,6 +27,8 @@
interval/2
]).
-export_type([heartbeat/0]).
-record(heartbeater, {interval, statval, repeat}).
-type name() :: incoming | outgoing.

View File

@ -130,7 +130,7 @@ ensure_username(Field) ->
%% ===================================================================
callback_mode() -> always_sync.
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,
#{

View File

@ -33,7 +33,7 @@
-define(POOL_NAME_SUFFIX, "bind_worker").
%% ===================================================================
-spec on_start(binary(), hoconsc:config(), proplists:proplist(), map()) ->
-spec on_start(binary(), hocon:config(), proplists:proplist(), map()) ->
{ok, binary(), map()} | {error, _}.
on_start(InstId, #{method := #{bind_password := _}} = Config, Options, State) ->
PoolName = pool_name(InstId),

View File

@ -24,13 +24,15 @@
runtime_tools,
redbug,
xmerl,
%% has no application/2 callback
{hocon, load},
telemetry,
{opentelemetry, load},
{opentelemetry_api, load},
{opentelemetry_experimental, load},
{opentelemetry_api_experimental, load},
{opentelemetry_exporter, load}
observer_cli,
covertool,
tools,
observer,
{system_monitor, load},
jq
],
%% must always be of type `load'
common_business_apps =>
@ -38,11 +40,6 @@
emqx,
emqx_conf,
esasl,
observer_cli,
tools,
covertool,
%% started by emqx_machine
system_monitor,
emqx_utils,
emqx_durable_storage,
emqx_http_lib,
@ -84,9 +81,7 @@
emqx_plugins,
emqx_opentelemetry,
quicer,
bcrypt,
jq,
observer
bcrypt
],
%% must always be of type `load'
ee_business_apps =>

View File

@ -43,7 +43,8 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec run() -> {ok, timer:time()}.
%% @doc Run global garbage collection and return the time (in milliseconds) spent.
-spec run() -> {ok, non_neg_integer()}.
run() -> gen_server:call(?MODULE, run, infinity).
-spec stop() -> ok.

View File

@ -6,7 +6,13 @@
{vsn, "0.2.17"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},
{applications, [kernel, stdlib, emqx_ctl, covertool]},
%% system_monitor is loaded but not booted,
%% emqx_machine.erl makes the decision when to start
%% the app after certain config injection.
%% it's a included_application because otherwise dialyzer
%% would report unknown functions
{included_applications, [system_monitor]},
{mod, {emqx_machine_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -47,9 +47,10 @@ start() ->
os:set_signal(sigterm, handle)
end,
ok = set_backtrace_depth(),
start_sysmon(),
ok = start_sysmon(),
configure_shard_transports(),
set_mnesia_extra_diagnostic_checks(),
emqx_otel_app:configure_otel_deps(),
ekka:start(),
ok.

View File

@ -36,9 +36,6 @@
%% If any of these applications crash, the entire EMQX node shuts down:
-define(BASIC_PERMANENT_APPS, [mria, ekka, esockd, emqx]).
%% These apps should NOT be (re)started automatically:
-define(EXCLUDED_APPS, [system_monitor, observer_cli, jq]).
%% These apps are optional, they may or may not be present in the
%% release, depending on the build flags:
-define(OPTIONAL_APPS, [bcrypt, observer]).
@ -69,9 +66,7 @@ stop_apps() ->
?SLOG(notice, #{msg => "stopping_emqx_apps"}),
_ = emqx_alarm_handler:unload(),
ok = emqx_conf_app:unset_config_loaded(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())),
%% Mute otel deps application.
ok = emqx_otel_app:stop_deps().
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
%% Those port apps are terminated after the main apps
%% Don't need to stop when reboot.
@ -159,7 +154,7 @@ basic_reboot_apps() ->
excluded_apps() ->
%% Optional apps _should_ be (re)started automatically, but only
%% when they are found in the release:
?EXCLUDED_APPS ++ [App || App <- ?OPTIONAL_APPS, not is_app(App)].
[App || App <- ?OPTIONAL_APPS, not is_app(App)].
is_app(Name) ->
case application:load(Name) of

Some files were not shown because too many files have changed in this diff Show More