Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-06-22 09:24:41 +08:00
commit 411f8a0ec7
12 changed files with 115 additions and 33 deletions

1
.gitignore vendored
View File

@ -40,3 +40,4 @@ xrefr
erlang.mk
*.coverdata
etc/emqx.conf.rendered
Mnesia.*/

View File

@ -3,16 +3,43 @@
REBAR_GIT_CLONE_OPTIONS += --depth 1
export REBAR_GIT_CLONE_OPTIONS
SUITES_FILES := $(shell find test -name '*_SUITE.erl')
CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*}))
CT_NODE_NAME = emqxct@127.0.0.1
.PHONY: cover
run:
@echo $(CT_TEST_SUITES)
RUN_NODE_NAME = emqxdebug@127.0.0.1
.PHONY: run
run: run_setup
@rebar3 as test get-deps
@rebar3 as test auto --name $(RUN_NODE_NAME) --script test/run_emqx.escript
.PHONY: run_setup
run_setup:
@erl -noshell -eval \
"{ok, [[HOME]]} = init:get_argument(home), \
FilePath = HOME ++ \"/.config/rebar3/rebar.config\", \
case file:consult(FilePath) of \
{ok, Term} -> \
NewTerm = case lists:keyfind(plugins, 1, Term) of \
false -> [{plugins, [rebar3_auto]} | Term]; \
{plugins, OldPlugins} -> \
NewPlugins0 = OldPlugins -- [rebar3_auto], \
NewPlugins = [rebar3_auto | NewPlugins0], \
lists:keyreplace(plugins, 1, Term, {plugins, NewPlugins}) \
end, \
ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]); \
_ -> \
NewTerm=[{plugins, [rebar3_auto]}], \
ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]) \
end, \
halt(0)."
.PHONY: shell
shell:
@rebar3 as test auto
compile:
@rebar3 compile
@ -89,5 +116,6 @@ gen-clean:
.PHONY: distclean
distclean: gen-clean
@rm -rf Mnesia.*
@rm -rf _build cover deps logs log data
@rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump

View File

@ -132,7 +132,8 @@
descr :: string(),
vendor :: string(),
active = false :: boolean(),
info :: map()
info :: map(),
type :: atom()
}).
%%--------------------------------------------------------------------

View File

@ -1,12 +1,12 @@
{deps,
[ {jsx, "2.9.0"} % hex
, {cowboy, "2.6.1"} % hex
, {gproc, "0.8.0"} % hex
, {ekka, "0.5.6"} % hex
, {replayq, "0.1.1"} %hex
, {esockd, "5.5.0"} %hex
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
[{jsx, "2.9.0"}, % hex
{cowboy, "2.6.1"}, % hex
{gproc, "0.8.0"}, % hex
{ekka, "0.5.6"}, % hex
{replayq, "0.1.1"}, %hex
{esockd, "5.5.0"}, %hex
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
@ -29,9 +29,9 @@
{profiles,
[{test,
[{deps,
[ {meck, "0.8.13"} % hex
, {bbmustache, "1.7.0"} % hex
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}}
[{meck, "0.8.13"}, % hex
{bbmustache, "1.7.0"}, % hex
{emqx_ct_helpers, "1.1.3"} % hex
]}
]}
]}.

View File

@ -29,7 +29,10 @@
-> {ok, emqx_types:credentials()} | {error, term()}).
authenticate(Credentials) ->
case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of
#{auth_result := success} = NewCredentials ->
#{auth_result := success, anonymous := true} = NewCredentials ->
emqx_metrics:inc('auth.mqtt.anonymous'),
{ok, NewCredentials};
#{auth_result := success} = NewCredentials ->
{ok, NewCredentials};
NewCredentials ->
{error, maps:get(auth_result, NewCredentials, unknown_error)}

View File

@ -98,7 +98,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
?LOG(warning, "~p set", [Alarm]),
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json));
emqx_broker:safe_publish(alarm_msg(topic(alert), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
@ -106,7 +106,12 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
{ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(notice, "~p clear", [AlarmId]),
emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)),
case encode_alarm({AlarmId, undefined}) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(clear), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
clear_alarm_(AlarmId),
{ok, State};
handle_event(_, State) ->
@ -142,19 +147,21 @@ encode_alarm({AlarmId, #alarm{severity = Severity,
{title, iolist_to_binary(Title)},
{summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(Ts)}]}]);
encode_alarm({AlarmId, undefined}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]);
encode_alarm({AlarmId, AlarmDesc}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
{desc, maybe_to_binary(AlarmDesc)}]).
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
{description, maybe_to_binary(AlarmDesc)}]).
alarm_msg(Topic, Payload) ->
Msg = emqx_message:make(?MODULE, Topic, Payload),
emqx_message:set_headers(#{'Content-Type' => <<"application/json">>},
emqx_message:set_flag(sys, Msg)).
topic(alert, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
topic(clear, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>).
topic(alert) ->
emqx_topic:systop(<<"alarms/alert">>);
topic(clear) ->
emqx_topic:systop(<<"alarms/clear">>).
maybe_to_binary(Data) when is_binary(Data) ->
Data;

View File

@ -132,6 +132,10 @@
{counter, 'messages.forward'} % Messages forward
]).
-define(MQTT_METRICS, [
{counter, 'auth.mqtt.anonymous'}
]).
-record(state, {next_idx = 1}).
-record(metric, {name, type, idx}).
@ -355,7 +359,7 @@ init([]) ->
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0)
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),
{ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
@ -446,4 +450,5 @@ reserved_idx('messages.retained') -> 48;
reserved_idx('messages.dropped') -> 49;
reserved_idx('messages.expired') -> 50;
reserved_idx('messages.forward') -> 51;
reserved_idx('auth.mqtt.anonymous') -> 52;
reserved_idx(_) -> undefined.

View File

@ -151,20 +151,20 @@ stop_plugins(Names) ->
-spec(list() -> [emqx_types:plugin()]).
list() ->
StartedApps = names(started_app),
lists:map(fun({Name, _, _}) ->
Plugin = plugin(Name),
lists:map(fun({Name, _, [Type| _]}) ->
Plugin = plugin(Name, Type),
case lists:member(Name, StartedApps) of
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
plugin(AppName) ->
plugin(AppName, Type) ->
case application:get_all_key(AppName) of
{ok, Attrs} ->
Ver = proplists:get_value(vsn, Attrs, "0"),
Descr = proplists:get_value(description, Attrs, ""),
#plugin{name = AppName, version = Ver, descr = Descr};
#plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)};
undefined -> error({plugin_not_found, AppName})
end.
@ -316,3 +316,10 @@ write_loaded(AppNames) ->
?LOG(error, "Open File ~p Error: ~p", [File, Error]),
{error, Error}
end.
plugin_type(auth) -> auth;
plugin_type(protocol) -> protocol;
plugin_type(backend) -> backend;
plugin_type(bridge) -> bridge;
plugin_type(_) -> feature.

View File

@ -221,7 +221,11 @@ websocket_handle(Frame, State)
{ok, ensure_stats_timer(State)};
websocket_handle({FrameType, _}, State)
when FrameType =:= ping; FrameType =:= pong ->
{ok, ensure_stats_timer(State)}.
{ok, ensure_stats_timer(State)};
%% According to mqtt spec[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901285]
websocket_handle({_OtherFrameType, _}, State) ->
?LOG(error, "Frame error: Other type of data frame"),
shutdown(other_frame_type, State).
websocket_info({call, From, info}, State) ->
gen_server:reply(From, info(State)),

View File

@ -62,8 +62,8 @@ t_alarm_handler(_) ->
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>),
Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>),
Topic1 = emqx_topic:systop(<<"alarms/alert">>),
Topic2 = emqx_topic:systop(<<"alarms/clear">>),
SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
emqx_client_sock:send(Sock,
raw_send_serialize(

View File

@ -31,6 +31,7 @@
all() ->
[ t_ws_connect_api
, t_ws_auth_failure
, t_ws_other_type_frame
].
init_per_suite(Config) ->
@ -71,6 +72,18 @@ t_ws_connect_api(_Config) ->
{close, _} = rfc6455_client:close(WS),
ok.
t_ws_other_type_frame(_Config) ->
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
{ok, _} = rfc6455_client:open(WS),
ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)),
{binary, Bin} = rfc6455_client:recv(WS),
Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
{ok, Connack, <<>>, _} = raw_recv_pase(Bin),
rfc6455_client:send(WS, <<"testdata">>),
timer:sleep(1000),
?assertEqual(undefined, erlang:process_info(WS)),
ok.
raw_send_serialize(Packet) ->
emqx_frame:serialize(Packet).

13
test/run_emqx.escript Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env escript
main(_) ->
start().
start() ->
SpecEmqxConfig = fun(_) -> ok end,
start(SpecEmqxConfig).
start(SpecEmqxConfig) ->
SchemaPath = filename:join(["priv", "emqx.schema"]),
ConfPath = filename:join(["etc", "emqx.conf"]),
emqx_ct_helpers:start_app(emqx, SchemaPath, ConfPath, SpecEmqxConfig).