Merge with the enterprise edition

This commit is contained in:
Feng Lee 2017-12-07 16:24:48 +08:00
parent f1640f5b85
commit 6e5134a8b2
15 changed files with 88 additions and 2099 deletions

View File

@ -35,7 +35,7 @@ TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
EUNIT_OPTS = verbose EUNIT_OPTS = verbose
CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \ CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \
emqx_vm emqx_net emqx_protocol emqx_access emqx_config emqx_router emqx_vm emqx_net emqx_protocol emqx_access emqx_router
CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1

View File

@ -112,10 +112,10 @@ start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
%% Start https listener %% Start https listener
start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
{ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}); {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}).
start_listener({Proto, ListenOn, Opts}) when Proto == api -> % start_listener({Proto, ListenOn, Opts}) when Proto == api ->
{ok, _} = mochiweb:start_http('mqtt:api', ListenOn, Opts, emqx_http:http_handler()). % {ok, _} = mochiweb:start_http('mqtt:api', ListenOn, Opts, emqx_http:http_handler()).
start_listener(Proto, ListenOn, Opts) -> start_listener(Proto, ListenOn, Opts) ->
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])),
@ -144,8 +144,8 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:stop_http('mqtt:ws', ListenOn); mochiweb:stop_http('mqtt:ws', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:stop_http('mqtt:wss', ListenOn); mochiweb:stop_http('mqtt:wss', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> % stop_listener({Proto, ListenOn, _Opts}) when Proto == api ->
mochiweb:stop_http('mqtt:api', ListenOn); % mochiweb:stop_http('mqtt:api', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) -> stop_listener({Proto, ListenOn, _Opts}) ->
esockd:close(Proto, ListenOn). esockd:close(Proto, ListenOn).

View File

@ -37,7 +37,6 @@ start(_Type, _Args) ->
print_banner(), print_banner(),
ekka:start(), ekka:start(),
{ok, Sup} = emqx_sup:start_link(), {ok, Sup} = emqx_sup:start_link(),
ok = emqx_cli:load(),
ok = register_acl_mod(), ok = register_acl_mod(),
start_autocluster(), start_autocluster(),
register(emqx, self()), register(emqx, self()),

View File

@ -1,613 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cli).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_cli.hrl").
-import(lists, [foreach/2]).
-import(proplists, [get_value/2]).
-export([load/0]).
-export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,
routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,
listeners/1, vm/1, mnesia/1, trace/1, acl/1]).
-define(PROC_INFOKEYS, [status,
memory,
message_queue_len,
total_heap_size,
heap_size,
stack_size,
reductions]).
-define(MAX_LIMIT, 10000).
-define(APP, emqx).
-spec(load() -> ok).
load() ->
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
lists:foreach(fun(Cmd) -> emqx_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) end, Cmds),
emqx_cli_config:register_config().
is_cmd(Fun) ->
not lists:member(Fun, [init, load, module_info]).
%%--------------------------------------------------------------------
%% Commands
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @doc Node status
status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
case lists:keysearch(?APP, 1, application:which_applications()) of
false ->
?PRINT("~s is not running~n", [?APP]);
{value, {?APP, _Desc, Vsn}} ->
?PRINT("~s ~s is running~n", [?APP, Vsn])
end;
status(_) ->
?PRINT_CMD("status", "Show broker status").
%%--------------------------------------------------------------------
%% @doc Query broker
broker([]) ->
Funs = [sysdescr, version, uptime, datetime],
foreach(fun(Fun) ->
?PRINT("~-10s: ~s~n", [Fun, emqx_broker:Fun()])
end, Funs);
broker(["stats"]) ->
foreach(fun({Stat, Val}) ->
?PRINT("~-20s: ~w~n", [Stat, Val])
end, emqx_stats:getstats());
broker(["metrics"]) ->
foreach(fun({Metric, Val}) ->
?PRINT("~-24s: ~w~n", [Metric, Val])
end, lists:sort(emqx_metrics:all()));
broker(["pubsub"]) ->
Pubsubs = supervisor:which_children(emqx_pubsub_sup:pubsub_pool()),
foreach(fun({{_, Id}, Pid, _, _}) ->
ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
?PRINT("pubsub: ~w~n", [Id]),
foreach(fun({Key, Val}) ->
?PRINT(" ~-18s: ~w~n", [Key, Val])
end, ProcInfo)
end, lists:reverse(Pubsubs));
broker(_) ->
?USAGE([{"broker", "Show broker version, uptime and description"},
{"broker pubsub", "Show process_info of pubsub"},
{"broker stats", "Show broker statistics of clients, topics, subscribers"},
{"broker metrics", "Show broker metrics"}]).
%%--------------------------------------------------------------------
%% @doc Cluster with other nodes
cluster(["join", SNode]) ->
case ekka:join(ekka_node:parse_name(SNode)) of
ok ->
?PRINT_MSG("Join the cluster successfully.~n"),
cluster(["status"]);
ignore ->
?PRINT_MSG("Ignore.~n");
{error, Error} ->
?PRINT("Failed to join the cluster: ~p~n", [Error])
end;
cluster(["leave"]) ->
case ekka:leave() of
ok ->
?PRINT_MSG("Leave the cluster successfully.~n"),
cluster(["status"]);
{error, Error} ->
?PRINT("Failed to leave the cluster: ~p~n", [Error])
end;
cluster(["force-leave", SNode]) ->
case ekka:force_leave(ekka_node:parse_name(SNode)) of
ok ->
?PRINT_MSG("Remove the node from cluster successfully.~n"),
cluster(["status"]);
ignore ->
?PRINT_MSG("Ignore.~n");
{error, Error} ->
?PRINT("Failed to remove the node from cluster: ~p~n", [Error])
end;
cluster(["status"]) ->
?PRINT("Cluster status: ~p~n", [ekka_cluster:status()]);
cluster(_) ->
?USAGE([{"cluster join <Node>", "Join the cluster"},
{"cluster leave", "Leave the cluster"},
{"cluster force-leave <Node>","Force the node leave from cluster"},
{"cluster status", "Cluster status"}]).
%%--------------------------------------------------------------------
%% @doc Users usage
users(Args) -> emqx_auth_username:cli(Args).
acl(["reload"]) -> emqx_access_control:reload_acl();
acl(_) -> ?USAGE([{"acl reload", "reload etc/acl.conf"}]).
%%--------------------------------------------------------------------
%% @doc Query clients
clients(["list"]) ->
dump(mqtt_client);
clients(["show", ClientId]) ->
if_client(ClientId, fun print/1);
clients(["kick", ClientId]) ->
if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqx_client:kick(Pid) end);
clients(_) ->
?USAGE([{"clients list", "List all clients"},
{"clients show <ClientId>", "Show a client"},
{"clients kick <ClientId>", "Kick out a client"}]).
if_client(ClientId, Fun) ->
case emqx_cm:lookup(bin(ClientId)) of
undefined -> ?PRINT_MSG("Not Found.~n");
Client -> Fun(Client)
end.
%%--------------------------------------------------------------------
%% @doc Sessions Command
sessions(["list"]) ->
dump(mqtt_local_session);
%% performance issue?
sessions(["list", "persistent"]) ->
lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', false, '_'}));
%% performance issue?
sessions(["list", "transient"]) ->
lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', true, '_'}));
sessions(["show", ClientId]) ->
case ets:lookup(mqtt_local_session, bin(ClientId)) of
[] -> ?PRINT_MSG("Not Found.~n");
[SessInfo] -> print(SessInfo)
end;
sessions(_) ->
?USAGE([{"sessions list", "List all sessions"},
{"sessions list persistent", "List all persistent sessions"},
{"sessions list transient", "List all transient sessions"},
{"sessions show <ClientId>", "Show a session"}]).
%%--------------------------------------------------------------------
%% @doc Routes Command
routes(["list"]) ->
Routes = emqx_router:dump(),
foreach(fun print/1, Routes);
routes(["show", Topic]) ->
Routes = lists:append(ets:lookup(mqtt_route, bin(Topic)),
ets:lookup(mqtt_local_route, bin(Topic))),
foreach(fun print/1, Routes);
routes(_) ->
?USAGE([{"routes list", "List all routes"},
{"routes show <Topic>", "Show a route"}]).
%%--------------------------------------------------------------------
%% @doc Topics Command
topics(["list"]) ->
lists:foreach(fun(Topic) -> ?PRINT("~s~n", [Topic]) end, emqx:topics());
topics(["show", Topic]) ->
print(mnesia:dirty_read(mqtt_route, bin(Topic)));
topics(_) ->
?USAGE([{"topics list", "List all topics"},
{"topics show <Topic>", "Show a topic"}]).
subscriptions(["list"]) ->
lists:foreach(fun(Subscription) ->
print(subscription, Subscription)
end, ets:tab2list(mqtt_subscription));
subscriptions(["show", ClientId]) ->
case emqx:subscriptions(bin(ClientId)) of
[] -> ?PRINT_MSG("Not Found.~n");
Subscriptions ->
[print(subscription, Sub) || Sub <- Subscriptions]
end;
subscriptions(["add", ClientId, Topic, QoS]) ->
if_valid_qos(QoS, fun(IntQos) ->
case emqx_sm:lookup_session(bin(ClientId)) of
undefined ->
?PRINT_MSG("Error: Session not found!");
#mqtt_session{sess_pid = SessPid} ->
{Topic1, Options} = emqx_topic:parse(bin(Topic)),
emqx_session:subscribe(SessPid, [{Topic1, [{qos, IntQos}|Options]}]),
?PRINT_MSG("ok~n")
end
end);
subscriptions(["del", ClientId, Topic]) ->
case emqx_sm:lookup_session(bin(ClientId)) of
undefined ->
?PRINT_MSG("Error: Session not found!");
#mqtt_session{sess_pid = SessPid} ->
emqx_session:unsubscribe(SessPid, [emqx_topic:parse(bin(Topic))]),
?PRINT_MSG("ok~n")
end;
subscriptions(_) ->
?USAGE([{"subscriptions list", "List all subscriptions"},
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
{"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
%if_could_print(Tab, Fun) ->
% case mnesia:table_info(Tab, size) of
% Size when Size >= ?MAX_LIMIT ->
% ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]);
% _Size ->
% Keys = mnesia:dirty_all_keys(Tab),
% foreach(fun(Key) -> Fun(ets:lookup(Tab, Key)) end, Keys)
% end.
if_valid_qos(QoS, Fun) ->
try list_to_integer(QoS) of
Int when ?IS_QOS(Int) -> Fun(Int);
_ -> ?PRINT_MSG("QoS should be 0, 1, 2~n")
catch _:_ ->
?PRINT_MSG("QoS should be 0, 1, 2~n")
end.
plugins(["list"]) ->
foreach(fun print/1, emqx_plugins:list());
plugins(["load", Name]) ->
case emqx_plugins:load(list_to_atom(Name)) of
{ok, StartedApps} ->
?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
{error, Reason} ->
?PRINT("load plugin error: ~p~n", [Reason])
end;
plugins(["unload", Name]) ->
case emqx_plugins:unload(list_to_atom(Name)) of
ok ->
?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
{error, Reason} ->
?PRINT("unload plugin error: ~p~n", [Reason])
end;
plugins(_) ->
?USAGE([{"plugins list", "Show loaded plugins"},
{"plugins load <Plugin>", "Load plugin"},
{"plugins unload <Plugin>", "Unload plugin"}]).
%%--------------------------------------------------------------------
%% @doc Bridges command
bridges(["list"]) ->
foreach(fun({Node, Topic, _Pid}) ->
?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node])
end, emqx_bridge_sup_sup:bridges());
bridges(["options"]) ->
?PRINT_MSG("Options:~n"),
?PRINT_MSG(" qos = 0 | 1 | 2~n"),
?PRINT_MSG(" prefix = string~n"),
?PRINT_MSG(" suffix = string~n"),
?PRINT_MSG(" queue = integer~n"),
?PRINT_MSG("Example:~n"),
?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
bridges(["start", SNode, Topic]) ->
case emqx_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
{ok, _} -> ?PRINT_MSG("bridge is started.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(["start", SNode, Topic, OptStr]) ->
Opts = parse_opts(bridge, OptStr),
case emqx_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of
{ok, _} -> ?PRINT_MSG("bridge is started.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(["stop", SNode, Topic]) ->
case emqx_bridge_sup_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
ok -> ?PRINT_MSG("bridge is stopped.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error])
end;
bridges(_) ->
?USAGE([{"bridges list", "List bridges"},
{"bridges options", "Bridge options"},
{"bridges start <Node> <Topic>", "Start a bridge"},
{"bridges start <Node> <Topic> <Options>", "Start a bridge with options"},
{"bridges stop <Node> <Topic>", "Stop a bridge"}]).
parse_opts(Cmd, OptStr) ->
Tokens = string:tokens(OptStr, ","),
[parse_opt(Cmd, list_to_atom(Opt), Val)
|| [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
parse_opt(bridge, qos, Qos) ->
{qos, list_to_integer(Qos)};
parse_opt(bridge, suffix, Suffix) ->
{topic_suffix, bin(Suffix)};
parse_opt(bridge, prefix, Prefix) ->
{topic_prefix, bin(Prefix)};
parse_opt(bridge, queue, Len) ->
{max_queue_len, list_to_integer(Len)};
parse_opt(_Cmd, Opt, _Val) ->
?PRINT("Bad Option: ~s~n", [Opt]).
%%--------------------------------------------------------------------
%% @doc vm command
vm([]) ->
vm(["all"]);
vm(["all"]) ->
[vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
vm(["load"]) ->
[?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqx_vm:loads()];
vm(["memory"]) ->
[?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
vm(["process"]) ->
foreach(fun({Name, Key}) ->
?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
end, [{limit, process_limit}, {count, process_count}]);
vm(["io"]) ->
IoInfo = erlang:system_info(check_io),
foreach(fun(Key) ->
?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)])
end, [max_fds, active_fds]);
vm(["ports"]) ->
foreach(fun({Name, Key}) ->
?PRINT("ports/~-16s: ~w~n", [Name, erlang:system_info(Key)])
end, [{count, port_count}, {limit, port_limit}]);
vm(_) ->
?USAGE([{"vm all", "Show info of Erlang VM"},
{"vm load", "Show load of Erlang VM"},
{"vm memory", "Show memory of Erlang VM"},
{"vm process", "Show process of Erlang VM"},
{"vm io", "Show IO of Erlang VM"},
{"vm ports", "Show Ports of Erlang VM"}]).
%%--------------------------------------------------------------------
%% @doc mnesia Command
mnesia([]) ->
mnesia:system_info();
mnesia(_) ->
?PRINT_CMD("mnesia", "Mnesia system info").
%%--------------------------------------------------------------------
%% @doc Trace Command
trace(["list"]) ->
foreach(fun({{Who, Name}, LogFile}) ->
?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
end, emqx_trace:all_traces());
trace(["client", ClientId, "off"]) ->
trace_off(client, ClientId);
trace(["client", ClientId, LogFile]) ->
trace_on(client, ClientId, LogFile);
trace(["topic", Topic, "off"]) ->
trace_off(topic, Topic);
trace(["topic", Topic, LogFile]) ->
trace_on(topic, Topic, LogFile);
trace(_) ->
?USAGE([{"trace list", "List all traces"},
{"trace client <ClientId> <LogFile>","Trace a client"},
{"trace client <ClientId> off", "Stop tracing a client"},
{"trace topic <Topic> <LogFile>", "Trace a topic"},
{"trace topic <Topic> off", "Stop tracing a Topic"}]).
trace_on(Who, Name, LogFile) ->
case emqx_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of
ok ->
?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
end.
trace_off(Who, Name) ->
case emqx_trace:stop_trace({Who, iolist_to_binary(Name)}) of
ok ->
?PRINT("stop tracing ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("stop tracing ~s ~s error: ~p.~n", [Who, Name, Error])
end.
%%--------------------------------------------------------------------
%% @doc Listeners Command
listeners([]) ->
foreach(fun({{Protocol, ListenOn}, Pid}) ->
Info = [{acceptors, esockd:get_acceptors(Pid)},
{max_clients, esockd:get_max_clients(Pid)},
{current_clients,esockd:get_current_clients(Pid)},
{shutdown_count, esockd:get_shutdown_count(Pid)}],
?PRINT("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
foreach(fun({Key, Val}) ->
?PRINT(" ~-16s: ~w~n", [Key, Val])
end, Info)
end, esockd:listeners());
listeners(["restart", Proto, ListenOn]) ->
ListenOn1 = case string:tokens(ListenOn, ":") of
[Port] -> list_to_integer(Port);
[IP, Port] -> {IP, list_to_integer(Port)}
end,
case emqx:restart_listener({list_to_atom(Proto), ListenOn1, []}) of
{ok, _Pid} ->
io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]);
{error, Error} ->
io:format("Failed to restart ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
end;
listeners(["stop", Proto, ListenOn]) ->
ListenOn1 = case string:tokens(ListenOn, ":") of
[Port] -> list_to_integer(Port);
[IP, Port] -> {IP, list_to_integer(Port)}
end,
case emqx:stop_listener({list_to_atom(Proto), ListenOn1, []}) of
ok ->
io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
{error, Error} ->
io:format("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
end;
listeners(_) ->
?USAGE([{"listeners", "List listeners"},
{"listeners restart <Proto> <Port>", "Restart a listener"},
{"listeners stop <Proto> <Port>", "Stop a listener"}]).
%%--------------------------------------------------------------------
%% Dump ETS
%%--------------------------------------------------------------------
dump(Table) ->
dump(Table, ets:first(Table)).
dump(_Table, '$end_of_table') ->
ok;
dump(Table, Key) ->
case ets:lookup(Table, Key) of
[Record] -> print(Record);
[] -> ok
end,
dump(Table, ets:next(Table, Key)).
print([]) ->
ok;
print(Routes = [#mqtt_route{topic = Topic} | _]) ->
Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes],
?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]);
%% print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) ->
%% TopicTable = [io_lib:format("~s:~w", [Topic, Qos])
%% || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
%% ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]);
%% print(Topics = [#mqtt_topic{}|_]) ->
%% foreach(fun print/1, Topics);
print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
[Name, Ver, Descr, Active]);
print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username,
peername = Peername, connected_at = ConnectedAt}) ->
?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
[ClientId, CleanSess, Username, emqx_net:format(Peername),
emqx_time:now_secs(ConnectedAt)]);
%% print(#mqtt_topic{topic = Topic, flags = Flags}) ->
%% ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]);
print({route, Routes}) ->
foreach(fun print/1, Routes);
print({local_route, Routes}) ->
foreach(fun print/1, Routes);
print(#mqtt_route{topic = Topic, node = Node}) ->
?PRINT("~s -> ~s~n", [Topic, Node]);
print({Topic, Node}) ->
?PRINT("~s -> ~s~n", [Topic, Node]);
print({ClientId, _ClientPid, _Persistent, SessInfo}) ->
Data = lists:append(SessInfo, emqx_stats:get_session_stats(ClientId)),
InfoKeys = [clean_sess,
subscriptions,
max_inflight,
inflight_len,
mqueue_len,
mqueue_dropped,
awaiting_rel_len,
deliver_msg,
enqueue_msg,
created_at],
?PRINT("Session(~s, clean_sess=~s, subscriptions=~w, max_inflight=~w, inflight=~w, "
"mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, "
"deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n",
[ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]).
print(subscription, {Sub, {share, _Share, Topic}}) when is_pid(Sub) ->
?PRINT("~p -> ~s~n", [Sub, Topic]);
print(subscription, {Sub, Topic}) when is_pid(Sub) ->
?PRINT("~p -> ~s~n", [Sub, Topic]);
print(subscription, {{SubId, SubPid}, {share, _Share, Topic}})
when is_binary(SubId), is_pid(SubPid) ->
?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]);
print(subscription, {{SubId, SubPid}, Topic})
when is_binary(SubId), is_pid(SubPid) ->
?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]);
print(subscription, {Sub, Topic, Props}) ->
print(subscription, {Sub, Topic}),
lists:foreach(fun({K, V}) when is_binary(V) ->
?PRINT(" ~-8s: ~s~n", [K, V]);
({K, V}) ->
?PRINT(" ~-8s: ~w~n", [K, V]);
(K) ->
?PRINT(" ~-8s: true~n", [K])
end, Props).
format(created_at, Val) ->
emqx_time:now_secs(Val);
format(_, Val) ->
Val.
bin(S) -> iolist_to_binary(S).

View File

@ -1,363 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module (emqx_cli_config).
-export ([register_config_cli/0,
register_config/0,
run/1,
set_usage/0,
all_cfgs/0,
get_cfg/2,
get_cfg/3,
read_config/1,
write_config/2]).
-define(APP, emqx).
-define(TAB, emqx_config).
register_config() ->
application:start(clique),
F = fun() -> ekka_mnesia:running_nodes() end,
clique:register_node_finder(F),
register_config_cli(),
create_config_tab().
create_config_tab() ->
case ets:info(?TAB, name) of
undefined ->
ets:new(?TAB, [named_table, public]),
{ok, PluginsEtcDir} = emqx:env(plugins_etc_dir),
Files = filelib:wildcard("*.conf", PluginsEtcDir),
lists:foreach(fun(File) ->
[FileName | _] = string:tokens(File, "."),
Configs = cuttlefish_conf:file(lists:concat([PluginsEtcDir, File])),
ets:insert(?TAB, {list_to_atom(FileName), Configs})
end, Files);
_ ->
ok
end.
read_config(App) ->
case ets:lookup(?TAB, App) of
[] -> [];
[{_, Value}] -> Value
end.
write_config(App, Terms) ->
ets:insert(?TAB, {App, Terms}).
run(Cmd) ->
clique:run(Cmd).
register_config_cli() ->
ok = clique_config:load_schema([code:priv_dir(?APP)], ?APP),
register_protocol_formatter(),
register_client_formatter(),
register_session_formatter(),
register_queue_formatter(),
register_lager_formatter(),
register_auth_config(),
register_protocol_config(),
register_connection_config(),
register_client_config(),
register_session_config(),
register_queue_config(),
register_broker_config(),
register_lager_config().
set_usage() ->
io:format("~-40s# ~-20s# ~-20s ~p~n", ["key", "value", "datatype", "app"]),
io:format("------------------------------------------------------------------------------------------------~n"),
lists:foreach(fun({Key, Val, Datatype, App}) ->
io:format("~-40s# ~-20s# ~-20s ~p~n", [Key, Val, Datatype, App])
end, all_cfgs()),
io:format("------------------------------------------------------------------------------------------------~n"),
io:format("Usage: set key=value --app=appname~n").
all_cfgs() ->
{Mappings, Mappings1} = lists:foldl(
fun({Key, {_, Map, _}}, {Acc, Acc1}) ->
Map1 = lists:map(fun(M) -> {cuttlefish_mapping:variable(M), Key} end, Map),
{Acc ++ Map, Acc1 ++ Map1}
end, {[], []}, ets:tab2list(clique_schema)),
lists:foldl(fun({Key, _}, Acc) ->
case lists:keyfind(cuttlefish_variable:tokenize(Key), 2, Mappings) of
false -> Acc;
Map ->
Datatype = format_datatype(cuttlefish_mapping:datatype(Map)),
App = proplists:get_value(cuttlefish_variable:tokenize(Key), Mappings1),
[{_, [Val0]}] = clique_config:show([Key], [{app, App}]),
Val = any_to_string(proplists:get_value(Key, Val0)),
[{Key, Val, Datatype, App} | Acc]
end
end, [],lists:sort(ets:tab2list(clique_config))).
get_cfg(App, Key) ->
get_cfg(App, Key, undefined).
get_cfg(App, Key, Def) ->
[{_, [Val0]}] = clique_config:show([Key], [{app, App}]),
proplists:get_value(Key, Val0, Def).
format_datatype(Value) ->
format_datatype(Value, "").
format_datatype([Head], Acc) when is_tuple(Head) ->
[Head1 | _] = erlang:tuple_to_list(Head),
lists:concat([Acc, Head1]);
format_datatype([Head], Acc) ->
lists:concat([Acc, Head]);
format_datatype([Head | Tail], Acc) when is_tuple(Head)->
[Head1 | _] = erlang:tuple_to_list(Head),
format_datatype(Tail, Acc ++ lists:concat([Head1, ", "]));
format_datatype([Head | Tail], Acc) ->
format_datatype(Tail, Acc ++ lists:concat([Head, ", "])).
%%--------------------------------------------------------------------
%% Auth/Acl
%%--------------------------------------------------------------------
register_auth_config() ->
ConfigKeys = ["mqtt.allow_anonymous",
"mqtt.acl_nomatch",
"mqtt.acl_file",
"mqtt.cache_acl"],
[clique:register_config(Key , fun auth_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
auth_config_callback([_, KeyStr], Value) ->
application:set_env(?APP, l2a(KeyStr), Value), " successfully\n".
%%--------------------------------------------------------------------
%% MQTT Protocol
%%--------------------------------------------------------------------
register_protocol_formatter() ->
ConfigKeys = ["max_clientid_len",
"max_packet_size",
"websocket_protocol_header",
"keepalive_backoff"],
[clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys].
protocol_formatter_callback([_, "websocket_protocol_header"], Params) ->
Params;
protocol_formatter_callback([_, Key], Params) ->
proplists:get_value(l2a(Key), Params).
register_protocol_config() ->
ConfigKeys = ["mqtt.max_clientid_len",
"mqtt.max_packet_size",
"mqtt.websocket_protocol_header",
"mqtt.keepalive_backoff"],
[clique:register_config(Key , fun protocol_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
protocol_config_callback([_AppStr, KeyStr], Value) ->
protocol_config_callback(protocol, l2a(KeyStr), Value).
protocol_config_callback(_App, websocket_protocol_header, Value) ->
application:set_env(?APP, websocket_protocol_header, Value),
" successfully\n";
protocol_config_callback(App, Key, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
" successfully\n".
%%--------------------------------------------------------------------
%% MQTT Connection
%%--------------------------------------------------------------------
register_connection_config() ->
ConfigKeys = ["mqtt.conn.force_gc_count"],
[clique:register_config(Key , fun connection_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
connection_config_callback([_, KeyStr0, KeyStr1], Value) ->
KeyStr = lists:concat([KeyStr0, "_", KeyStr1]),
application:set_env(?APP, l2a(KeyStr), Value),
" successfully\n".
%%--------------------------------------------------------------------
%% MQTT Client
%%--------------------------------------------------------------------
register_client_formatter() ->
ConfigKeys = ["max_publish_rate",
"idle_timeout",
"enable_stats"],
[clique:register_formatter(["mqtt", "client", Key], fun client_formatter_callback/2) || Key <- ConfigKeys].
client_formatter_callback([_, _, Key], Params) ->
proplists:get_value(list_to_atom(Key), Params).
register_client_config() ->
ConfigKeys = ["mqtt.client.max_publish_rate",
"mqtt.client.idle_timeout",
"mqtt.client.enable_stats"],
[clique:register_config(Key , fun client_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
client_config_callback([_, AppStr, KeyStr], Value) ->
client_config_callback(l2a(AppStr), l2a(KeyStr), Value).
client_config_callback(App, idle_timeout, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(client_idle_timeout, 1, Env, {client_idle_timeout, Value})),
" successfully\n";
client_config_callback(App, enable_stats, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(client_enable_stats, 1, Env, {client_enable_stats, Value})),
" successfully\n";
client_config_callback(App, Key, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
" successfully\n".
%%--------------------------------------------------------------------
%% session
%%--------------------------------------------------------------------
register_session_formatter() ->
ConfigKeys = ["max_subscriptions",
"upgrade_qos",
"max_inflight",
"retry_interval",
"max_awaiting_rel",
"await_rel_timeout",
"enable_stats",
"expiry_interval",
"ignore_loop_deliver"],
[clique:register_formatter(["mqtt", "session", Key], fun session_formatter_callback/2) || Key <- ConfigKeys].
session_formatter_callback([_, _, Key], Params) ->
proplists:get_value(list_to_atom(Key), Params).
register_session_config() ->
ConfigKeys = ["mqtt.session.max_subscriptions",
"mqtt.session.upgrade_qos",
"mqtt.session.max_inflight",
"mqtt.session.retry_interval",
"mqtt.session.max_awaiting_rel",
"mqtt.session.await_rel_timeout",
"mqtt.session.enable_stats",
"mqtt.session.expiry_interval",
"mqtt.session.ignore_loop_deliver"],
[clique:register_config(Key , fun session_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
session_config_callback([_, AppStr, KeyStr], Value) ->
session_config_callback(l2a(AppStr), l2a(KeyStr), Value).
session_config_callback(App, Key, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
" successfully\n".
l2a(List) -> list_to_atom(List).
%%--------------------------------------------------------------------
%% MQTT MQueue
%%--------------------------------------------------------------------
register_queue_formatter() ->
ConfigKeys = ["type",
"priority",
"max_length",
"low_watermark",
"high_watermark",
"store_qos0"],
[clique:register_formatter(["mqtt", "mqueue", Key], fun queue_formatter_callback/2) || Key <- ConfigKeys].
queue_formatter_callback([_, _, Key], Params) ->
proplists:get_value(list_to_atom(Key), Params).
register_queue_config() ->
ConfigKeys = ["mqtt.mqueue.type",
"mqtt.mqueue.priority",
"mqtt.mqueue.max_length",
"mqtt.mqueue.low_watermark",
"mqtt.mqueue.high_watermark",
"mqtt.mqueue.store_qos0"],
[clique:register_config(Key , fun queue_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
queue_config_callback([_, AppStr, KeyStr], Value) ->
queue_config_callback(l2a(AppStr), l2a(KeyStr), Value).
queue_config_callback(App, low_watermark, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(low_watermark, 1, Env, {low_watermark, Value})),
" successfully\n";
queue_config_callback(App, high_watermark, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(high_watermark, 1, Env, {high_watermark, Value})),
" successfully\n";
queue_config_callback(App, Key, Value) ->
{ok, Env} = emqx:env(App),
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
" successfully\n".
%%--------------------------------------------------------------------
%% MQTT Broker
%%--------------------------------------------------------------------
register_broker_config() ->
ConfigKeys = ["mqtt.broker.sys_interval"],
[clique:register_config(Key , fun broker_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
broker_config_callback([_, KeyStr0, KeyStr1], Value) ->
KeyStr = lists:concat([KeyStr0, "_", KeyStr1]),
application:set_env(?APP, l2a(KeyStr), Value),
" successfully\n".
%%--------------------------------------------------------------------
%% MQTT Lager
%%--------------------------------------------------------------------
register_lager_formatter() ->
ConfigKeys = ["level"],
[clique:register_formatter(["log", "console", Key], fun lager_formatter_callback/2) || Key <- ConfigKeys].
lager_formatter_callback(_, Params) ->
proplists:get_value(lager_console_backend, Params).
register_lager_config() ->
ConfigKeys = ["log.console.level"],
[clique:register_config(Key , fun lager_config_callback/2) || Key <- ConfigKeys],
ok = register_config_whitelist(ConfigKeys).
lager_config_callback(_, Value) ->
lager:set_loglevel(lager_console_backend, Value),
" successfully\n".
register_config_whitelist(ConfigKeys) ->
clique:register_config_whitelist(ConfigKeys, ?APP).
%%--------------------------------------------------------------------
%% Inner Function
%%--------------------------------------------------------------------
any_to_string(I) when is_integer(I) ->
integer_to_list(I);
any_to_string(F) when is_float(F)->
float_to_list(F,[{decimals, 4}]);
any_to_string(A) when is_atom(A) ->
atom_to_list(A);
any_to_string(B) when is_binary(B) ->
binary_to_list(B);
any_to_string(L) when is_list(L) ->
L.

View File

@ -68,15 +68,6 @@ run([]) -> usage(), ok;
run(["help"]) -> usage(), ok; run(["help"]) -> usage(), ok;
run(["set"] = CmdS) when length(CmdS) =:= 1 ->
emqx_cli_config:set_usage(), ok;
run(["set" | _] = CmdS) ->
emqx_cli_config:run(["config" | CmdS]), ok;
run(["show" | _] = CmdS) ->
emqx_cli_config:run(["config" | CmdS]), ok;
run([CmdS|Args]) -> run([CmdS|Args]) ->
case lookup(list_to_atom(CmdS)) of case lookup(list_to_atom(CmdS)) of
[{Mod, Fun}] -> [{Mod, Fun}] ->

View File

@ -1,236 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc HTTP publish API and websocket client.
-module(emqx_http).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-import(proplists, [get_value/2, get_value/3]).
-export([http_handler/0, handle_request/2, http_api/0, inner_handle_request/2]).
-include("emqx_rest.hrl").
-include("emqx_internal.hrl").
-record(state, {dispatch}).
http_handler() ->
APIs = http_api(),
State = #state{dispatch = dispatcher(APIs)},
{?MODULE, handle_request, [State]}.
http_api() ->
Attr = emqx_rest_api:module_info(attributes),
[{Regexp, Method, Function, Args} || {http_api, [{Regexp, Method, Function, Args}]} <- Attr].
%%--------------------------------------------------------------------
%% Handle HTTP Request
%%--------------------------------------------------------------------
handle_request(Req, State) ->
Path = Req:get(path),
case Path of
"/status" ->
handle_request("/status", Req, Req:get(method));
"/" ->
handle_request("/", Req, Req:get(method));
"/api/v2/auth" -> %%TODO: Security Issue!
handle_request(Path, Req, State);
_ ->
if_authorized(Req, fun() -> handle_request(Path, Req, State) end)
end.
inner_handle_request(Req, State) ->
Path = Req:get(path),
case Path of
"/api/v2/auth" -> handle_request(Path, Req, State);
_ -> if_authorized(Req, fun() -> handle_request(Path, Req, State) end)
end.
handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) ->
Dispatch(Req, Url);
handle_request("/status", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
AppStatus = case lists:keysearch(emqx, 1, application:which_applications()) of
false -> not_running;
{value, _Val} -> running
end,
Status = io_lib:format("Node ~s is ~s~nemqx is ~s",
[node(), InternalStatus, AppStatus]),
Req:ok({"text/plain", iolist_to_binary(Status)});
handle_request("/", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' ->
respond(Req, 200, api_list());
handle_request(_, Req, #state{}) ->
respond(Req, 404, []).
dispatcher(APIs) ->
fun(Req, Url) ->
Method = Req:get(method),
case filter(APIs, Url, Method) of
[{Regexp, _Method, Function, FilterArgs}] ->
case params(Req) of
{error, Error1} ->
respond(Req, 200, Error1);
Params ->
case {check_params(Params, FilterArgs),
check_params_type(Params, FilterArgs)} of
{true, true} ->
{match, [MatchList]} = re:run(Url, Regexp, [global, {capture, all_but_first, list}]),
Args = lists:append([[Method, Params], MatchList]),
lager:debug("Mod:~p, Fun:~p, Args:~p", [emqx_rest_api, Function, Args]),
case catch apply(emqx_rest_api, Function, Args) of
{ok, Data} ->
respond(Req, 200, [{code, ?SUCCESS}, {result, Data}]);
{error, Error} ->
respond(Req, 200, Error);
{'EXIT', Reason} ->
lager:error("Execute API '~s' Error: ~p", [Url, Reason]),
respond(Req, 404, [])
end;
{false, _} ->
respond(Req, 200, [{code, ?ERROR7}, {message, <<"params error">>}]);
{_, false} ->
respond(Req, 200, [{code, ?ERROR8}, {message, <<"params type error">>}])
end
end;
_ ->
lager:error("No match Url:~p", [Url]),
respond(Req, 404, [])
end
end.
% %%--------------------------------------------------------------------
% %% Basic Authorization
% %%--------------------------------------------------------------------
if_authorized(Req, Fun) ->
case authorized(Req) of
true -> Fun();
false -> respond(Req, 401, [])
end.
authorized(Req) ->
case Req:get_header_value("Authorization") of
undefined ->
false;
"Basic " ++ BasicAuth ->
{Username, Password} = user_passwd(BasicAuth),
case emq_mgmt:check_user(Username, Password) of
ok ->
true;
{error, Reason} ->
lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]),
false
end
end.
user_passwd(BasicAuth) ->
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
respond(Req, 401, Data) ->
Req:respond({401, [{"WWW-Authenticate", "Basic Realm=\"emqx control center\""}], Data});
respond(Req, 404, Data) ->
Req:respond({404, [{"Content-Type", "text/plain"}], Data});
respond(Req, 200, Data) ->
Req:respond({200, [{"Content-Type", "application/json"}], to_json(Data)});
respond(Req, Code, Data) ->
Req:respond({Code, [{"Content-Type", "text/plain"}], Data}).
filter(APIs, Url, Method) ->
lists:filter(fun({Regexp, Method1, _Function, _Args}) ->
case re:run(Url, Regexp, [global, {capture, all_but_first, list}]) of
{match, _} -> Method =:= Method1;
_ -> false
end
end, APIs).
params(Req) ->
Method = Req:get(method),
case Method of
'GET' ->
mochiweb_request:parse_qs(Req);
_ ->
case Req:recv_body() of
<<>> -> [];
undefined -> [];
Body ->
case jsx:is_json(Body) of
true -> jsx:decode(Body);
false ->
lager:error("Body:~p", [Body]),
{error, [{code, ?ERROR9}, {message, <<"Body not json">>}]}
end
end
end.
check_params(_Params, Args) when Args =:= [] ->
true;
check_params(Params, Args)->
not lists:any(fun({Item, _Type}) -> undefined =:= proplists:get_value(Item, Params) end, Args).
check_params_type(_Params, Args) when Args =:= [] ->
true;
check_params_type(Params, Args) ->
not lists:any(fun({Item, Type}) ->
Val = proplists:get_value(Item, Params),
case Type of
int -> not is_integer(Val);
binary -> not is_binary(Val);
bool -> not is_boolean(Val)
end
end, Args).
to_json([]) -> <<"[]">>;
to_json(Data) -> iolist_to_binary(mochijson2:encode(Data)).
api_list() ->
[{paths, [<<"api/v2/management/nodes">>,
<<"api/v2/management/nodes/{node_name}">>,
<<"api/v2/monitoring/nodes">>,
<<"api/v2/monitoring/nodes/{node_name}">>,
<<"api/v2/monitoring/listeners">>,
<<"api/v2/monitoring/listeners/{node_name}">>,
<<"api/v2/monitoring/metrics/">>,
<<"api/v2/monitoring/metrics/{node_name}">>,
<<"api/v2/monitoring/stats">>,
<<"api/v2/monitoring/stats/{node_name}">>,
<<"api/v2/nodes/{node_name}/clients">>,
<<"api/v2/nodes/{node_name}/clients/{clientid}">>,
<<"api/v2/clients/{clientid}">>,
<<"api/v2/clients/{clientid}/clean_acl_cache">>,
<<"api/v2/nodes/{node_name}/sessions">>,
<<"api/v2/nodes/{node_name}/sessions/{clientid}">>,
<<"api/v2/sessions/{clientid}">>,
<<"api/v2/nodes/{node_name}/subscriptions">>,
<<"api/v2/nodes/{node_name}/subscriptions/{clientid}">>,
<<"api/v2/subscriptions/{clientid}">>,
<<"api/v2/routes">>,
<<"api/v2/routes/{topic}">>,
<<"api/v2/mqtt/publish">>,
<<"api/v2/mqtt/subscribe">>,
<<"api/v2/mqtt/unsubscribe">>,
<<"api/v2/nodes/{node_name}/plugins">>,
<<"api/v2/nodes/{node_name}/plugins/{plugin_name}">>,
<<"api/v2/configs/{app}">>,
<<"api/v2/nodes/{node_name}/configs/{app}">>]}].

View File

@ -1,544 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_internal.hrl").
-include("emqx_rest.hrl").
-include_lib("stdlib/include/qlc.hrl").
-record(mqtt_admin, {username, password, tags}).
-define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))).
-import(proplists, [get_value/2]).
-export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0,
plugins/0, plugins/1, listeners/0, listener/1, nodes_info/0, node_info/1]).
-export([plugin_list/1, plugin_unload/2, plugin_load/2]).
-export([client_list/4, session_list/4, route_list/3, subscription_list/4, alarm_list/0]).
-export([client/1, session/1, route/1, subscription/1]).
-export([query_table/4, lookup_table/3]).
-export([publish/1, subscribe/1, unsubscribe/1]).
-export([kick_client/1, clean_acl_cache/2]).
-export([modify_config/2, modify_config/3, modify_config/4, get_configs/0, get_config/1,
get_plugin_config/1, get_plugin_config/2, modify_plugin_config/2, modify_plugin_config/3]).
-export([add_user/3, check_user/2, user_list/0, lookup_user/1,
update_user/2, change_password/3, remove_user/1]).
-define(KB, 1024).
-define(MB, (1024*1024)).
-define(GB, (1024*1024*1024)).
brokers() ->
[{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()].
broker(Node) when Node =:= node() ->
emqx_broker:info();
broker(Node) ->
rpc_call(Node, broker, [Node]).
metrics() ->
[{Node, metrics(Node)} || Node <- ekka_mnesia:running_nodes()].
metrics(Node) when Node =:= node() ->
emqx_metrics:all();
metrics(Node) ->
rpc_call(Node, metrics, [Node]).
stats() ->
[{Node, stats(Node)} || Node <- ekka_mnesia:running_nodes()].
stats(Node) when Node =:= node() ->
emqx_stats:getstats();
stats(Node) ->
rpc_call(Node, stats, [Node]).
plugins() ->
[{Node, plugins(Node)} || Node <- ekka_mnesia:running_nodes()].
plugins(Node) when Node =:= node() ->
emqx_plugins:list(Node);
plugins(Node) ->
rpc_call(Node, plugins, [Node]).
listeners() ->
[{Node, listener(Node)} || Node <- ekka_mnesia:running_nodes()].
listener(Node) when Node =:= node() ->
lists:map(fun({{Protocol, ListenOn}, Pid}) ->
Info = [{acceptors, esockd:get_acceptors(Pid)},
{max_clients, esockd:get_max_clients(Pid)},
{current_clients,esockd:get_current_clients(Pid)},
{shutdown_count, esockd:get_shutdown_count(Pid)}],
{Protocol, ListenOn, Info}
end, esockd:listeners());
listener(Node) ->
rpc_call(Node, listener, [Node]).
nodes_info() ->
Running = mnesia:system_info(running_db_nodes),
Stopped = mnesia:system_info(db_nodes) -- Running,
DownNodes = lists:map(fun stop_node/1, Stopped),
[node_info(Node) || Node <- Running] ++ DownNodes.
node_info(Node) when Node =:= node() ->
CpuInfo = [{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()],
Memory = emqx_vm:get_memory(),
OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
[{name, node()},
{otp_release, list_to_binary(OtpRel)},
{memory_total, kmg(get_value(allocated, Memory))},
{memory_used, kmg(get_value(used, Memory))},
{process_available, erlang:system_info(process_limit)},
{process_used, erlang:system_info(process_count)},
{max_fds, get_value(max_fds, erlang:system_info(check_io))},
{clients, ets:info(mqtt_client, size)},
{node_status, 'Running'} | CpuInfo];
node_info(Node) ->
rpc_call(Node, node_info, [Node]).
stop_node(Node) ->
[{name, Node}, {node_status, 'Stopped'}].
%%--------------------------------------------------------
%% plugins
%%--------------------------------------------------------
plugin_list(Node) when Node =:= node() ->
emqx_plugins:list();
plugin_list(Node) ->
rpc_call(Node, plugin_list, [Node]).
plugin_load(Node, PluginName) when Node =:= node() ->
emqx_plugins:load(PluginName);
plugin_load(Node, PluginName) ->
rpc_call(Node, plugin_load, [Node, PluginName]).
plugin_unload(Node, PluginName) when Node =:= node() ->
emqx_plugins:unload(PluginName);
plugin_unload(Node, PluginName) ->
rpc_call(Node, plugin_unload, [Node, PluginName]).
%%--------------------------------------------------------
%% client
%%--------------------------------------------------------
client_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
client_list(Key, PageNo, PageSize);
client_list(Node, Key, PageNo, PageSize) ->
rpc_call(Node, client_list, [Node, Key, PageNo, PageSize]).
client(ClientId) ->
lists:flatten([client_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
%%--------------------------------------------------------
%% session
%%--------------------------------------------------------
session_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
session_list(Key, PageNo, PageSize);
session_list(Node, Key, PageNo, PageSize) ->
rpc_call(Node, session_list, [Node, Key, PageNo, PageSize]).
session(ClientId) ->
lists:flatten([session_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
%%--------------------------------------------------------
%% subscription
%%--------------------------------------------------------
subscription_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
subscription_list(Key, PageNo, PageSize);
subscription_list(Node, Key, PageNo, PageSize) ->
rpc_call(Node, subscription_list, [Node, Key, PageNo, PageSize]).
subscription(Key) ->
lists:flatten([subscription_list(Node, Key, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
%%--------------------------------------------------------
%% Routes
%%--------------------------------------------------------
route(Key) -> route_list(Key, 1, 20).
%%--------------------------------------------------------
%% alarm
%%--------------------------------------------------------
alarm_list() ->
emqx_alarm:get_alarms().
query_table(Qh, PageNo, PageSize, TotalNum) ->
Cursor = qlc:cursor(Qh),
case PageNo > 1 of
true -> qlc:next_answers(Cursor, (PageNo - 1) * PageSize);
false -> ok
end,
Rows = qlc:next_answers(Cursor, PageSize),
qlc:delete_cursor(Cursor),
[{totalNum, TotalNum},
{totalPage, total_page(TotalNum, PageSize)},
{result, Rows}].
total_page(TotalNum, PageSize) ->
case TotalNum rem PageSize of
0 -> TotalNum div PageSize;
_ -> (TotalNum div PageSize) + 1
end.
%%TODO: refactor later...
lookup_table(LookupFun, _PageNo, _PageSize) ->
Rows = LookupFun(),
Rows.
%%--------------------------------------------------------------------
%% mqtt
%%--------------------------------------------------------------------
publish({ClientId, Topic, Payload, Qos, Retain}) ->
case validate(topic, Topic) of
true ->
Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
emqx:publish(Msg#mqtt_message{retain = Retain}),
ok;
false ->
{error, format_error(Topic, "validate topic: ${0} fail")}
end.
subscribe({ClientId, Topic, Qos}) ->
case validate(topic, Topic) of
true ->
case emqx_sm:lookup_session(ClientId) of
undefined ->
{error, format_error(ClientId, "Clientid: ${0} not found")};
#mqtt_session{sess_pid = SessPid} ->
emqx_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]),
ok
end;
false ->
{error, format_error(Topic, "validate topic: ${0} fail")}
end.
unsubscribe({ClientId, Topic}) ->
case validate(topic, Topic) of
true ->
case emqx_sm:lookup_session(ClientId) of
undefined ->
{error, format_error(ClientId, "Clientid: ${0} not found")};
#mqtt_session{sess_pid = SessPid} ->
emqx_session:unsubscribe(SessPid, [{Topic, []}]),
ok
end;
false ->
{error, format_error(Topic, "validate topic: ${0} fail")}
end.
% publish(Messages) ->
% lists:foldl(
% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) ->
% case validate(topic, Topic) of
% true ->
% Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
% emqx:publish(Msg#mqtt_message{retain = Retain}),
% {[[{topic, Topic}]| Success], Failed};
% false ->
% {Success, [[{topic, Topic}]| Failed]}
% end
% end, {[], []}, Messages).
% subscribers(Subscribers) ->
% lists:foldl(
% fun({ClientId, Topic, Qos}, {Success, Failed}) ->
% case emqx_sm:lookup_session(ClientId) of
% undefined ->
% {Success, [[{client_id, ClientId}]|Failed]};
% #mqtt_session{sess_pid = SessPid} ->
% emqx_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]),
% {[[{client_id, ClientId}]| Success], Failed}
% end
% end,{[], []}, Subscribers).
% unsubscribers(UnSubscribers)->
% lists:foldl(
% fun({ClientId, Topic}, {Success, Failed}) ->
% case emqx_sm:lookup_session(ClientId) of
% undefined ->
% {Success, [[{client_id, ClientId}]|Failed]};
% #mqtt_session{sess_pid = SessPid} ->
% emqx_session:unsubscriber(SessPid, [{Topic, []}]),
% {[[{client_id, ClientId}]| Success], Failed}
% end
% end, {[], []}, UnSubscribers).
%%--------------------------------------------------------------------
%% manager API
%%--------------------------------------------------------------------
kick_client(ClientId) ->
Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
lists:any(fun(Item) -> Item =:= ok end, Result).
kick_client(Node, ClientId) when Node =:= node() ->
case emqx_cm:lookup(ClientId) of
undefined -> error;
#mqtt_client{client_pid = Pid}-> emqx_client:kick(Pid)
end;
kick_client(Node, ClientId) ->
rpc_call(Node, kick_client, [Node, ClientId]).
clean_acl_cache(ClientId, Topic) ->
Result = [clean_acl_cache(Node, ClientId, Topic) || Node <- ekka_mnesia:running_nodes()],
lists:any(fun(Item) -> Item =:= ok end, Result).
clean_acl_cache(Node, ClientId, Topic) when Node =:= node() ->
case emqx_cm:lookup(ClientId) of
undefined -> error;
#mqtt_client{client_pid = Pid}-> emqx_client:clean_acl_cache(Pid, Topic)
end;
clean_acl_cache(Node, ClientId, Topic) ->
rpc_call(Node, clean_acl_cache, [Node, ClientId, Topic]).
%%--------------------------------------------------------------------
%% Config ENV
%%--------------------------------------------------------------------
modify_config(App, Terms) ->
emqx_config:write(App, Terms).
modify_config(App, Key, Value) ->
Result = [modify_config(Node, App, Key, Value) || Node <- ekka_mnesia:running_nodes()],
lists:any(fun(Item) -> Item =:= ok end, Result).
modify_config(Node, App, Key, Value) when Node =:= node() ->
emqx_config:set(App, Key, Value);
modify_config(Node, App, Key, Value) ->
rpc_call(Node, modify_config, [Node, App, Key, Value]).
get_configs() ->
[{Node, get_config(Node)} || Node <- ekka_mnesia:running_nodes()].
get_config(Node) when Node =:= node()->
emqx_cli_config:all_cfgs();
get_config(Node) ->
rpc_call(Node, get_config, [Node]).
get_plugin_config(PluginName) ->
emqx_config:read(PluginName).
get_plugin_config(Node, PluginName) ->
rpc_call(Node, get_plugin_config, [PluginName]).
modify_plugin_config(PluginName, Terms) ->
emqx_config:write(PluginName, Terms).
modify_plugin_config(Node, PluginName, Terms) ->
rpc_call(Node, modify_plugin_config, [PluginName, Terms]).
%%--------------------------------------------------------------------
%% manager user API
%%--------------------------------------------------------------------
check_user(undefined, _) ->
{error, "Username undefined"};
check_user(_, undefined) ->
{error, "Password undefined"};
check_user(Username, Password) ->
case mnesia:dirty_read(mqtt_admin, Username) of
[#mqtt_admin{password = <<Salt:4/binary, Hash/binary>>}] ->
case Hash =:= md5_hash(Salt, Password) of
true -> ok;
false -> {error, "Password error"}
end;
[] ->
{error, "User not found"}
end.
add_user(Username, Password, Tag) ->
Admin = #mqtt_admin{username = Username,
password = hash(Password),
tags = Tag},
return(mnesia:transaction(fun add_user_/1, [Admin])).
add_user_(Admin = #mqtt_admin{username = Username}) ->
case mnesia:wread({mqtt_admin, Username}) of
[] -> mnesia:write(Admin);
[_] -> {error, [{code, ?ERROR13}, {message, <<"User already exist">>}]}
end.
user_list() ->
[row(Admin) || Admin <- ets:tab2list(mqtt_admin)].
lookup_user(Username) ->
Admin = mnesia:dirty_read(mqtt_admin, Username),
row(Admin).
update_user(Username, Params) ->
case mnesia:dirty_read({mqtt_admin, Username}) of
[] ->
{error, [{code, ?ERROR5}, {message, <<"User not found">>}]};
[User] ->
Admin = case proplists:get_value(<<"tags">>, Params) of
undefined -> User;
Tag -> User#mqtt_admin{tags = Tag}
end,
return(mnesia:transaction(fun() -> mnesia:write(Admin) end))
end.
remove_user(Username) ->
Trans = fun() ->
case lookup_user(Username) of
[] -> {error, [{code, ?ERROR5}, {message, <<"User not found">>}]};
_ -> mnesia:delete({mqtt_admin, Username})
end
end,
return(mnesia:transaction(Trans)).
change_password(Username, OldPwd, NewPwd) ->
Trans = fun() ->
case mnesia:wread({mqtt_admin, Username}) of
[Admin = #mqtt_admin{password = <<Salt:4/binary, Hash/binary>>}] ->
case Hash =:= md5_hash(Salt, OldPwd) of
true ->
mnesia:write(Admin#mqtt_admin{password = hash(NewPwd)});
false ->
{error, [{code, ?ERROR14}, {message, <<"OldPassword error">>}]}
end;
[] ->
{error, [{code, ?ERROR5}, {message, <<"User not found">>}]}
end
end,
return(mnesia:transaction(Trans)).
return({atomic, ok}) ->
ok;
return({atomic, Error}) ->
Error;
return({aborted, Reason}) ->
lager:error("Mnesia Transaction error:~p~n", [Reason]),
error.
row(#mqtt_admin{username = Username, tags = Tags}) ->
[{username, Username}, {tags, Tags}];
row([#mqtt_admin{username = Username, tags = Tags}]) ->
[{username, Username}, {tags, Tags}];
row([]) ->[].
%%--------------------------------------------------------------------
%% Internel Functions.
%%--------------------------------------------------------------------
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
kmg(Byte) when Byte > ?GB ->
float(Byte / ?GB, "G");
kmg(Byte) when Byte > ?MB ->
float(Byte / ?MB, "M");
kmg(Byte) when Byte > ?KB ->
float(Byte / ?MB, "K");
kmg(Byte) ->
Byte.
float(F, S) ->
iolist_to_binary(io_lib:format("~.2f~s", [F, S])).
validate(qos, Qos) ->
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
validate(topic, Topic) ->
emqx_topic:validate({name, Topic}).
client_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) ->
TotalNum = ets:info(mqtt_client, size),
Qh = qlc:q([R || R <- ets:table(mqtt_client)]),
query_table(Qh, PageNo, PageSize, TotalNum);
client_list(ClientId, PageNo, PageSize) ->
Fun = fun() -> ets:lookup(mqtt_client, ClientId) end,
lookup_table(Fun, PageNo, PageSize).
session_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) ->
TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_session]]),
Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_session]]),
query_table(Qh, PageNo, PageSize, TotalNum);
session_list(ClientId, PageNo, PageSize) ->
MP = {ClientId, '_', '_', '_'},
Fun = fun() -> lists:append([ets:match_object(Tab, MP) || Tab <- [mqtt_local_session]]) end,
lookup_table(Fun, PageNo, PageSize).
subscription_list(Key, PageNo, PageSize) when ?EMPTY_KEY(Key) ->
TotalNum = ets:info(mqtt_subproperty, size),
Qh = qlc:q([E || E <- ets:table(mqtt_subproperty)]),
query_table(Qh, PageNo, PageSize, TotalNum);
subscription_list(Key, PageNo, PageSize) ->
Fun = fun() -> ets:match_object(mqtt_subproperty, {{'_', {Key, '_'}}, '_'}) end,
lookup_table(Fun, PageNo, PageSize).
route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) ->
Tables = [mqtt_route],
TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_route, mqtt_local_route]]),
Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- Tables]),
Data = query_table(Qh, PageNo, PageSize, TotalNum),
Route = get_value(result, Data),
LocalRoute = local_route_list(Topic, PageNo, PageSize),
lists:keyreplace(result, 1, Data, {result, lists:append(Route, LocalRoute)});
route_list(Topic, PageNo, PageSize) ->
Tables = [mqtt_route],
Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- Tables]) end,
Route = lookup_table(Fun, PageNo, PageSize),
LocalRoute = local_route_list(Topic, PageNo, PageSize),
lists:append(Route, LocalRoute).
local_route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) ->
TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_route]]),
Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_route]]),
Data = query_table(Qh, PageNo, PageSize, TotalNum),
lists:map(fun({Topic1, Node}) -> {<<"$local/", Topic1/binary>>, Node} end, get_value(result, Data));
local_route_list(Topic, PageNo, PageSize) ->
Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- [mqtt_local_route]]) end,
Data = lookup_table(Fun, PageNo, PageSize),
lists:map(fun({Topic1, Node}) -> {<<"$local/", Topic1/binary>>, Node} end, Data).
format_error(Val, Msg) ->
re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]).
hash(Password) ->
SaltBin = salt(),
<<SaltBin/binary, (md5_hash(SaltBin, Password))/binary>>.
md5_hash(SaltBin, Password) ->
erlang:md5(<<SaltBin/binary, Password/binary>>).
salt() ->
seed(),
Salt = rand:uniform(16#ffffffff),
<<Salt:32>>.
seed() ->
rand:seed(exsplus, erlang:timestamp()).

View File

@ -204,15 +204,16 @@ process(?CONNECT_PACKET(Var), State0) ->
client_id = ClientId, client_id = ClientId,
is_bridge = IsBridge} = Var, is_bridge = IsBridge} = Var,
State1 = State0#proto_state{proto_ver = ProtoVer, State1 = repl_username_with_peercert(
proto_name = ProtoName, State0#proto_state{proto_ver = ProtoVer,
username = Username, proto_name = ProtoName,
client_id = ClientId, username = Username,
clean_sess = CleanSess, client_id = ClientId,
keepalive = KeepAlive, clean_sess = CleanSess,
will_msg = willmsg(Var, State0), keepalive = KeepAlive,
is_bridge = IsBridge, will_msg = willmsg(Var, State0),
connected_at = os:timestamp()}, is_bridge = IsBridge,
connected_at = os:timestamp()}),
{ReturnCode1, SessPresent, State3} = {ReturnCode1, SessPresent, State3} =
case validate_connect(Var, State1) of case validate_connect(Var, State1) of
@ -407,6 +408,7 @@ shutdown(mnesia_conflict, _State) ->
%% let it down %% let it down
%% emqx_cm:unreg(ClientId); %% emqx_cm:unreg(ClientId);
ignore; ignore;
shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
?LOG(info, "Shutdown for ~p", [Error], State), ?LOG(info, "Shutdown for ~p", [Error], State),
Client = client(State), Client = client(State),

View File

@ -34,7 +34,10 @@
-export([start/0, stop/0]). -export([start/0, stop/0]).
%% Route APIs %% Route APIs
-export([add_route/1, del_route/1, match/1, print/1, has_route/1]). -export([add_route/1, get_routes/1, del_route/1, has_route/1]).
%% Match and print
-export([match/1, print/1]).
%% Local Route API %% Local Route API
-export([get_local_routes/0, add_local_route/1, match_local/1, -export([get_local_routes/0, add_local_route/1, match_local/1,
@ -130,6 +133,11 @@ add_trie_route(Route = #mqtt_route{topic = Topic}) ->
end, end,
mnesia:write(Route). mnesia:write(Route).
%% @doc Lookup Routes
-spec(get_routes(binary()) -> [mqtt_route()]).
get_routes(Topic) ->
ets:lookup(mqtt_route, Topic).
%% @doc Delete Route %% @doc Delete Route
-spec(del_route(binary() | mqtt_route()) -> ok | {error, Reason :: term()}). -spec(del_route(binary() | mqtt_route()) -> ok | {error, Reason :: term()}).
del_route(Topic) when is_binary(Topic) -> del_route(Topic) when is_binary(Topic) ->
@ -284,7 +292,5 @@ clean_routes_(Node) ->
mnesia:transaction(Clean). mnesia:transaction(Clean).
update_stats_() -> update_stats_() ->
Size = mnesia:table_info(mqtt_route, size), emqx_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)).
emqx_stats:setstats('routes/count', 'routes/max', Size),
emqx_stats:setstats('topics/count', 'topics/max', Size).

View File

@ -1,52 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_cli_SUITE).
-compile(export_all).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
all() ->
[{group, subscriptions}].
groups() ->
[{subscriptions, [sequence],
[t_subsciptions_list,
t_subsciptions_show,
t_subsciptions_add,
t_subsciptions_del]}].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
todo.
t_subsciptions_list(_) ->
todo.
t_subsciptions_show(_) ->
todo.
t_subsciptions_add(_) ->
todo.
t_subsciptions_del(_) ->
todo.

View File

@ -68,7 +68,6 @@ all() ->
{group, http}, {group, http},
{group, alarms}, {group, alarms},
{group, cli}, {group, cli},
{group, rest_api},
{group, cleanSession}]. {group, cleanSession}].
groups() -> groups() ->
@ -121,9 +120,7 @@ groups() ->
{cleanSession, [sequence], {cleanSession, [sequence],
[cleanSession_validate, [cleanSession_validate,
cleanSession_validate1] cleanSession_validate1]
}, }
{rest_api, [sequence],
[get_api_lists]}
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -413,14 +410,59 @@ request_publish(_) ->
{client_id, <<"random">>}, {client_id, <<"random">>},
{clean_sess, false}]), {clean_sess, false}]),
SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("admin", "public"))), ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("admin", "public"))),
ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), ok = emqx:subscribe(<<"a/b/c">>, self(), [{qos, 1}]),
Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}", Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}",
?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("admin", "public"))), ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("admin", "public"))),
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("admin", "public"))). ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("admin", "public"))).
connect_emqx_pubsub_(Method, Api, Params, Auth) ->
Url = "http://127.0.0.1:8080/" ++ Api,
case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of
{error, socket_closed_remotely} ->
false;
{ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } ->
true;
{ok, {{"HTTP/1.1", 400, _}, _, []}} ->
false;
{ok, {{"HTTP/1.1", 404, _}, _, []}} ->
false
end.
request(Path) ->
http_get(get, Path).
http_get(Method, Path) ->
req(Method, Path, []).
http_put(Method, Path, Params) ->
req(Method, Path, format_for_upload(Params)).
http_post(Method, Path, Params) ->
req(Method, Path, format_for_upload(Params)).
req(Method, Path, Body) ->
Url = ?URL ++ Path,
Headers = auth_header_("", ""),
case httpc:request(Method, {Url, [Headers]}, [], []) of
{error, R} ->
ct:log("R:~p~n", [R]),
false;
{ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } ->
true;
{ok, {{"HTTP/1.1", 400, _}, _, []}} ->
false;
{ok, {{"HTTP/1.1", 404, _}, _, []}} ->
false
end.
format_for_upload(none) ->
<<"">>;
format_for_upload(List) ->
iolist_to_binary(mochijson2:encode(List)).
connect_emqx_publish_(Method, Api, Params, Auth) -> connect_emqx_publish_(Method, Api, Params, Auth) ->
Url = "http://127.0.0.1:8080/" ++ Api, Url = "http://127.0.0.1:8080/" ++ Api,
@ -615,69 +657,6 @@ cleanSession_validate1(_) ->
emqttc:disconnect(Pub), emqttc:disconnect(Pub),
emqttc:disconnect(C11). emqttc:disconnect(C11).
get_api_lists(_Config) ->
lists:foreach(fun request/1, ?GET_API).
request_publish(_) ->
emqttc:start_link([{host, "localhost"},
{port, 1883},
{client_id, <<"random">>},
{clean_sess, false}]),
SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("", ""))),
ok = emqx:subscribe(<<"a/b/c">>, self(), [{qos, 1}]),
Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}",
?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("", ""))),
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("", ""))).
connect_emqx_pubsub_(Method, Api, Params, Auth) ->
Url = "http://127.0.0.1:8080/" ++ Api,
case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of
{error, socket_closed_remotely} ->
false;
{ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } ->
true;
{ok, {{"HTTP/1.1", 400, _}, _, []}} ->
false;
{ok, {{"HTTP/1.1", 404, _}, _, []}} ->
false
end.
request(Path) ->
http_get(get, Path).
http_get(Method, Path) ->
req(Method, Path, []).
http_put(Method, Path, Params) ->
req(Method, Path, format_for_upload(Params)).
http_post(Method, Path, Params) ->
req(Method, Path, format_for_upload(Params)).
req(Method, Path, Body) ->
Url = ?URL ++ Path,
Headers = auth_header_("", ""),
case httpc:request(Method, {Url, [Headers]}, [], []) of
{error, R} ->
ct:log("R:~p~n", [R]),
false;
{ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } ->
true;
{ok, {{"HTTP/1.1", 400, _}, _, []}} ->
false;
{ok, {{"HTTP/1.1", 404, _}, _, []}} ->
false
end.
format_for_upload(none) ->
<<"">>;
format_for_upload(List) ->
iolist_to_binary(mochijson2:encode(List)).
ensure_ok(ok) -> ok; ensure_ok(ok) -> ok;
ensure_ok({error, {already_started, _}}) -> ok. ensure_ok({error, {already_started, _}}) -> ok.
@ -766,34 +745,3 @@ set_app_env({App, Lists}) ->
application:set_env(App, Par, Var) application:set_env(App, Par, Var)
end, Lists). end, Lists).
request(Path) ->
http_get(get, Path).
http_get(Method, Path) ->
req(Method, Path, []).
http_put(Method, Path, Params) ->
req(Method, Path, format_for_upload(Params)).
http_post(Method, Path, Params) ->
req(Method, Path, format_for_upload(Params)).
req(Method, Path, Body) ->
Url = ?URL ++ Path,
Headers = auth_header_("admin", "public"),
case httpc:request(Method, {Url, [Headers]}, [], []) of
{error, socket_closed_remotely} ->
false;
{ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } ->
true;
{ok, {{"HTTP/1.1", 400, _}, _, []}} ->
false;
{ok, {{"HTTP/1.1", 404, _}, _, []}} ->
false
end.
format_for_upload(none) ->
<<"">>;
format_for_upload(List) ->
iolist_to_binary(mochijson2:encode(List)).

View File

@ -1,149 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_config_SUITE).
-compile(export_all).
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
[{group, emq_config}].
groups() ->
[{emq_config, [sequence],
[run_protocol_cmd,
run_client_cmd,
run_session_cmd,
run_queue_cmd,
run_auth_cmd,
run_lager_cmd,
run_connection_cmd,
run_broker_config]
}].
init_per_suite(Config) ->
Config.
end_per_suite(Config) ->
Config.
run_protocol_cmd(_Config) ->
SetConfigKeys = [{"max_clientid_len=2048", int},
{"max_packet_size=1024", int},
% {"websocket_protocol_header=off", atom},
{"keepalive_backoff=0.5", float}],
lists:foreach(fun set_cmd/1, SetConfigKeys),
R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)),
{ok, E} = application:get_env(emqttd, protocol),
?assertEqual(R, lists:sort(E)),
emqttd_cli_config:run(["config", "set", "mqtt.websocket_protocol_header=off", "--app=emqttd"]),
{ok, E1} = application:get_env(emqttd, websocket_protocol_header),
?assertEqual(false, E1).
run_client_cmd(_Config) ->
SetConfigKeys = [{"max_publish_rate=100", int},
{"idle_timeout=60s", date},
{"enable_stats=on", atom}],
lists:foreach(fun(Key) -> set_cmd("client", Key) end, SetConfigKeys),
R = lists:sort(lists:map(fun(Key) -> env_value("client", Key) end, SetConfigKeys)),
{ok, E} = application:get_env(emqttd, client),
?assertEqual(R, lists:sort(E)).
run_session_cmd(_Config) ->
SetConfigKeys = [{"max_subscriptions=5", int},
{"upgrade_qos=on", atom},
{"max_inflight=64", int},
{"retry_interval=60s", date},
{"max_awaiting_rel=200", int},
{"await_rel_timeout=60s",date},
{"enable_stats=on", atom},
{"expiry_interval=60s", date},
{"ignore_loop_deliver=true", atom}],
lists:foreach(fun(Key) -> set_cmd("session", Key) end, SetConfigKeys),
R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)),
{ok, E} = application:get_env(emqttd, session),
?assertEqual(R, lists:sort(E)).
run_queue_cmd(_Config) ->
SetConfigKeys = [{"type=priority", atom},
{"priority=hah", string},
{"max_length=2000", int},
{"low_watermark=40%",percent},
{"high_watermark=80%", percent},
{"store_qos0=false", atom}],
lists:foreach(fun(Key) -> set_cmd("mqueue", Key) end, SetConfigKeys),
R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)),
{ok, E} = application:get_env(emqttd, mqueue),
?assertEqual(R, lists:sort(E)).
run_auth_cmd(_Config) ->
SetConfigKeys = [{"allow_anonymous=true", atom},
{"acl_nomatch=deny", atom},
{"acl_file=etc/test.acl", string},
{"cache_acl=false", atom}],
lists:foreach(fun set_cmd/1, SetConfigKeys),
{ok, true} = application:get_env(emqttd, allow_anonymous),
{ok, deny} = application:get_env(emqttd, acl_nomatch),
{ok, "etc/test.acl"} = application:get_env(emqttd, acl_file),
{ok, false} = application:get_env(emqttd, cache_acl).
run_lager_cmd(_Config) ->
emqttd_cli_config:run(["config", "set", "log.console.level=info", "--app=emqttd"]),
ok.
run_connection_cmd(_Config) ->
emqttd_cli_config:run(["config", "set", "mqtt.conn.force_gc_count=1000", "--app=emqttd"]),
{ok, E} = application:get_env(emqttd, conn_force_gc_count),
?assertEqual(1000, E).
run_broker_config(_Config) ->
emqttd_cli_config:run(["config", "set", "mqtt.broker.sys_interval=10", "--app=emqttd"]),
{ok, E} = application:get_env(emqttd, broker_sys_interval),
?assertEqual(10, E).
env_value("client", {Key, Type}) ->
case string:split(Key, "=") of
["max_publish_rate", V] ->
{list_to_atom("max_publish_rate"), format(Type, V)};
[K, V] ->
{list_to_atom(string:join(["client", K], "_")), format(Type, V)}
end.
env_value({Key, Type}) ->
[K, V] = string:split(Key, "="),
{list_to_atom(K), format(Type, V)}.
format(string, S) -> S;
format(atom, "on") -> true;
format(atom, "off") -> false;
format(atom, A) -> list_to_atom(A);
format(float, F) -> list_to_float(F);
format(percent, P) ->
{match, [N]} = re:run(P, "^([0-9]+)%$", [{capture, all_but_first, list}]),
list_to_integer(N) / 100;
format(int, I) -> list_to_integer(I);
format(date, _I) -> 60000.
set_cmd({Key, _Type}) ->
emqttd_cli_config:run(["config", "set", string:join(["mqtt", Key], "."), "--app=emqttd"]).
set_cmd(Pre, {Key, _Type}) ->
emqttd_cli_config:run(["config", "set", string:join(["mqtt", Pre, Key], "."), "--app=emqttd"]).

View File

@ -14,11 +14,11 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqttd_mod_SUITE). -module(emqx_mod_SUITE).
-compile(export_all). -compile(export_all).
-include("emqttd.hrl"). -include("emqx.hrl").
all() -> [mod_subscription_rep]. all() -> [mod_subscription_rep].

View File

@ -14,15 +14,15 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqttd_router_SUITE). -module(emqx_router_SUITE).
-compile(export_all). -compile(export_all).
-include("emqttd.hrl"). -include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(R, emqttd_router). -define(R, emqx_router).
all() -> all() ->
[{group, route}, [{group, route},
@ -44,11 +44,11 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
ekka:start(), ekka:start(),
ekka_mnesia:ensure_started(), ekka_mnesia:ensure_started(),
{ok, _R} = emqttd_router:start(), {ok, _R} = emqx_router:start(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqttd_router:stop(), emqx_router:stop(),
ekka:stop(), ekka:stop(),
ekka_mnesia:ensure_stopped(), ekka_mnesia:ensure_stopped(),
ekka_mnesia:delete_schema(). ekka_mnesia:delete_schema().
@ -148,7 +148,7 @@ router_add_del(_) ->
%% Del %% Del
?R:del_route(<<"a/b/c">>), ?R:del_route(<<"a/b/c">>),
[R1, R2] = lists:sort(?R:match(<<"a/b/c">>)), [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)),
{atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
%% Batch Del %% Batch Del
R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'},
@ -169,6 +169,6 @@ t_print(_) ->
?R:del_route(<<"#">>). ?R:del_route(<<"#">>).
router_unused(_) -> router_unused(_) ->
gen_server:call(emqttd_router, bad_call), gen_server:call(emqx_router, bad_call),
gen_server:cast(emqttd_router, bad_msg), gen_server:cast(emqx_router, bad_msg),
emqttd_router ! bad_info. emqx_router ! bad_info.