%% FIFO Server %% %% Usage: %% {ok,Pid} = qserver:start('aa'). %% qserver:start('bb'). %% qserver:start('cc'). %% qserver:listq(). %% qserver:inq('aa', 123). %% qserver:inq('aa', [1,2,3]). %% qserver:inq('bb', hello). %% qserver:lenq('aa'). %% qserver:revq('aa'). %% qserver:outq('bb'). %% qserver:outq(Pid). %% qserver:flushq('aa'). %% qserver:stop('aa'). %% -module(qserver). %% Exported functions -export([start/1, stop/1, recode/2, listq/0, inq/2, outq/1, flushq/1, lenq/1, revq/1]). %% Internal functions -export([call/3, cast/3, loop/2, do_q/3, debug_q/3, is_running/1]). -record(state, {q}). %% queues limit -define(MAXQ, 256). %% Exported functions start(Name) -> case pg2:get_closest_pid(Name) of {error, _} -> Pid = spawn(?MODULE, loop, [ fun(M,V,S) -> qserver:do_q(M,V,S) end, #state{q=queue:new()} ]), pg2:create(Name), pg2:join(Name, Pid), {ok, Pid}; Pid -> {ok, Pid} end. stop(Name) -> cast(Name, stop, Name). recode(Name, Code) -> cast(Name, change_code, Code). %% List all queues listq() -> pg2:which_groups(). %% Add value to the tail of the queue inq(Name,Val) -> call(Name, in_q, Val). %% Extract value from the head of the queue outq(Name) -> call(Name, out_q, none). %% queue length lenq(Name) -> call(Name, len_q, none). %% reverse the whole queue revq(Name) -> call(Name, reverse_q, none). %% flush the queue = new empty queue flushq(Name) -> call(Name, flush_q, none). %% Internal functions %% expect some reply call(Pid, Req, Args) when is_pid(Pid) -> Pid ! {call, self(), Req, Args}, receive {ok, Reply} -> Reply; {'EXIT', Why} -> exit(Why) end; call(Name, Req, Args) -> case is_running(Name) of {true, Pid} -> call(Pid, Req, Args); {false, Why} -> {oops, Why} end. %% send and pray - no reply cast(Pid, Req, Args) when is_pid(Pid) -> Pid ! {cast, Req, Args}, ok; cast(Name, Req, Args) -> case is_running(Name) of {true, Pid} -> cast(Pid, Req, Args); {false, Why} -> {oops, Why} end. loop(Fun, State) -> receive {cast, stop, Name} -> pg2:delete(Name), exit(normal); {cast, change_code, NewFun} -> loop(NewFun, State); {call, From, Mode, Val} -> case (catch Fun(Mode, Val, State)) of {'EXIT', Why} -> From ! {'EXIT', Why}, loop(Fun, State); {Reply, State1} -> From ! {ok, Reply}, loop(Fun, State1) end end. %% check if process is alive is_running(Name)-> case pg2:get_closest_pid(Name) of {error, Reason} -> {false, Reason}; Pid -> {rpc:call(node(Pid), erlang, is_process_alive, [Pid]), Pid} end. %% Operations with the queue %% Usage: %% qserver:recode('aa', fun(M,V,S) -> qserver:do_q(M,V,S) end). %% Add value to the tail of the queue do_q(Mode, Val, #state{q=Q1}) when Mode == in_q -> case queue:len(Q1) < ?MAXQ of true -> Q2 = queue:in(Val, Q1), {{ok, Val}, #state{q=Q2}}; false -> {'EXIT', full} end; %% Extract value from the head of the queue do_q(Mode, _Val, #state{q=Q1}) when Mode == out_q -> case queue:is_empty(Q1) of false -> {{value,Val}, Q2} = queue:out(Q1), {Val, #state{q=Q2}}; true -> {'EXIT', empty} end; %% queue length do_q(Mode, _Val, #state{q=Q1} = State) when Mode == len_q -> {queue:len(Q1), State}; %% reverse the whole queue do_q(Mode, _Val, #state{q=Q1}) when Mode == reverse_q -> Q2 = queue:reverse(Q1), {ok, #state{q=Q2}}; %% flush the queue do_q(Mode, _Val, _State) when Mode == flush_q -> Q2 = queue:new(), {ok, #state{q=Q2}}; %% everything else do_q(_Mode, _Val, State) -> {undefined, State}. %% Debug the queue %% Usage: %% qserver:recode('aa', fun(M,V,S) -> qserver:debug_q(M,V,S) end). debug_q(Mode, Val, State) -> io:format("mode: ~p, val: ~p~n",[Mode, Val]), {Val, State}.