%% CouchDB %% Copyright (C) 2006 Damien Katz %% %% This program is free software; you can redistribute it and/or %% modify it under the terms of the GNU General Public License %% as published by the Free Software Foundation; either version 2 %% of the License, or (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program; if not, write to the Free Software %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -module(couch_db). -behaviour(gen_server). -export([open/2,create/2,create/3,save_doc/2,save_doc/3,get_doc_info/2, save_doc_revs/3]). -export([save_docs/2, save_docs/3]). -export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). -export([update_stew_group_sync/2,update_stew_group/2,fold_stew/6,fold_stew/7]). -export([update_view_group_sync/2,update_view_group/2,fold_view/6,fold_view/7,get_info/1]). -export([update_temp_view_group_sync/2, fold_temp_view/5,fold_temp_view/6]). -export([update_loop/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -export([rev_list_to_trees/2 , merge_rev_trees/2 ]). -include("couch_db.hrl"). -define(HEADER_SIZE, 2048). % size of each segment of the doubly written header -define(DB_FILE_SIG, <<$g, $m, $k, 0>>). % fixed signature at the beginning of each file header -define(FILE_VERSION, 1). % the current file version this code base uses. % In the future CouchDB will have the ability % to use/convert different file versions. -record(db_header, {write_version = 0, last_update_seq = 0, summary_stream_state = nil, docinfo_by_Id_btree_state = nil, docinfo_by_seq_btree_state = nil, view_group_btree_state = nil, stew_group_btree_state = nil, local_docs_btree_state = nil, doc_count=0 }). -record(db, {fd=0, supervisor=0, header = #db_header{}, uncommitted_writes = false, summary_stream, docinfo_by_Id_btree, docinfo_by_seq_btree, local_docs_btree, last_update_seq, view_group_btree, stew_group_btree, doc_count, name, view_group_mgr, stew_group_mgr }). -record(main, {db, update_pid, view_group_mgr, temp_view_group_mgr, stew_group_mgr }). start_link(DbName, Filepath, Options) -> {ok, Super} = couch_db_sup:start_link(), FdResult = supervisor:start_child(Super, {couch_file, {couch_file, open, [Filepath, Options]}, permanent, brutal_kill, worker, [couch_file]}), case FdResult of {ok, Fd} -> {ok, _Db} = supervisor:start_child(Super, {couch_db, %%%%%% below calls gen_server:start_link(couch_db, {{DbName, FilePath}, Fd, Super, Options}, []). {gen_server, start_link, [couch_db, {{DbName, Filepath}, Fd, Super, Options}, []]}, permanent, 10, worker, [couch_db]}), {ok, Super}; {error, {enoent, _ChildInfo}} -> % couldn't find file exit(Super,kill), {error, not_found}; {error, {Error, _ChildInfo}} -> exit(Super,kill), {error, Error}; Else -> exit(Super,kill), {error, Else} end. %%% Interface functions %%% create(Filepath, Options) -> create(Filepath, Filepath, Options). create(DbName, Filepath, Options) when is_list(Options) -> start_link(DbName, Filepath, [create | Options]). open(DbName, Filepath) -> start_link(DbName, Filepath, []). delete_doc(SupPid, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], {ok, Results} = save_doc_revs(SupPid, DeletedDocs, [new_edits]). open_doc(_SupPid, #doc_info{deleted=true}) -> {not_found, deleted}; open_doc(SupPid, IdOrDocInfo) -> open_doc(SupPid, IdOrDocInfo, []). open_doc(SupPid, Id, Options) -> open_doc_int(get_db(db_pid(SupPid)), Id, Options). open_doc_revs(SupPid, Id, Revs, Options) -> open_doc_revs_int(get_db(db_pid(SupPid)), Id, Revs, Options). get_missing_revs(SupPid, IdRevsList) -> Ids = [Id1 || {Id1, _Revs} <- IdRevsList], {ok, FullDocInfoResults} = get_full_doc_infos(SupPid, Ids), Results = lists:zipwith( fun({Id, Revs}, FullDocInfoResult) -> case FullDocInfoResult of {ok, {_, RevisionTrees}} -> {Id, find_missing_revs(Revs, RevisionTrees)}; {not_found, _} -> {Id, Revs} end end, IdRevsList, FullDocInfoResults), {ok, Results}. get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of {ok, {DocInfo, _RevisionTrees}} -> {ok, DocInfo}; Else -> Else end. get_full_doc_info(Db, Id) -> case get_full_doc_infos(Db, [Id]) of {ok, [{ok, DocInfo}]} -> {ok, DocInfo}; {ok, [{not_found, Id}]} -> not_found end. get_full_doc_infos(SupPid, Ids) when is_pid(SupPid) -> get_full_doc_infos(get_db(db_pid(SupPid)), Ids); get_full_doc_infos(#db{}=Db, Ids) -> {ok, LookupResults} = couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids), FinalResults = lists:map( fun({ok, {Id, {UpdateSeq, Rev, SummaryPointer, RevisionTrees}}}) -> {Conflicts, DeletedConflicts} = get_conflict_revs(RevisionTrees), {ok, { #doc_info { id=Id, rev=Rev, update_seq=UpdateSeq, summary_pointer=SummaryPointer, deleted=(summary_ptr_type(SummaryPointer)==deletion), conflict_revs=Conflicts, deleted_conflict_revs=DeletedConflicts }, RevisionTrees } }; ({not_found, {Id, _}}) -> {not_found, Id} end, LookupResults), {ok, FinalResults}. get_info(SupPid) when is_pid(SupPid) -> get_info(get_db(db_pid(SupPid))); get_info(#db{doc_count=Count, last_update_seq=SeqNum}) -> InfoList = [ {doc_count, Count}, {last_update_seq, SeqNum} ], {ok, InfoList}. save_doc(SupPid, Doc) -> save_doc(SupPid, Doc, []). save_doc(SupPid, Doc, Options) -> {ok, [[Result]]} = save_docs(SupPid, [{[Doc], [new_edits | Options]}], Options), Result. save_doc_revs(SupPid, Docs, Options) -> {ok, [RetValue]} = save_docs(SupPid, [{Docs, Options}], Options), {ok, RetValue}. save_docs(SupPid, DocAndOptions) -> save_docs(SupPid, DocAndOptions, []). save_docs(SupPid, DocsAndOptions, TransactionOptions) -> % flush unwritten binaries to disk. Db = get_db(db_pid(SupPid)), DocsAndOptions2 = [{[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Docs], Options} || {Docs, Options} <- DocsAndOptions], {ok, _RetValues} = gen_server:call(db_pid(SupPid), {update_docs, DocsAndOptions2, TransactionOptions}). doc_flush_binaries(Doc, Fd) -> % calc size of binaries to write out Bins = Doc#doc.attachments, PreAllocSize = lists:foldl( fun(BinValue, SizeAcc) -> case BinValue of {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd -> % already written to our file, nothing to write SizeAcc; {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} -> % written to a different file SizeAcc + Len; {_Key, {_Type, Bin}} when is_binary(Bin) -> SizeAcc + size(Bin) end end, 0, Bins), {ok, OutputStream} = couch_stream:open(Fd), ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize), NewBins = lists:map( fun({Key, {Type, BinValue}}) -> NewBinValue = case BinValue of {Fd0, StreamPointer, Len} when Fd0 == Fd -> % already written to our file, nothing to write {Fd, StreamPointer, Len}; {OtherFd, StreamPointer, Len} -> % written to a different file (or a closed file % instance, which will cause an error) {ok, {NewStreamPointer, Len}, _EndSp} = couch_stream:foldl(OtherFd, StreamPointer, Len, fun(Bin, {BeginPointer, SizeAcc}) -> {ok, Pointer} = couch_stream:write(OutputStream, Bin), case SizeAcc of 0 -> % this was the first write, record the pointer {ok, {Pointer, size(Bin)}}; _ -> {ok, {BeginPointer, SizeAcc + size(Bin)}} end end, {{0,0}, 0}), {Fd, NewStreamPointer, Len}; Bin when is_binary(Bin), size(Bin) > 0 -> {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), {Fd, StreamPointer, size(Bin)} end, {Key, {Type, NewBinValue}} end, Bins), {ok, _FinalPos} = couch_stream:close(OutputStream), Doc#doc{attachments = NewBins}. enum_docs_since(SupPid, SinceSeq, Direction, InFun, Ctx) -> Db = get_db(db_pid(SupPid)), EnumFun = fun({UpdateSeq, {Id, Rev, SummaryPointer, ConflictRevs, DeletedConflictRevs}}, Offset, EnumCtx) -> DocInfo = #doc_info{ id = Id, rev = Rev, update_seq = UpdateSeq, summary_pointer = SummaryPointer, conflict_revs = ConflictRevs, deleted_conflict_revs = DeletedConflictRevs, deleted = (summary_ptr_type(SummaryPointer) == deletion) }, InFun(DocInfo, Offset, EnumCtx) end, couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, EnumFun, Ctx). enum_docs_since(SupPid, SinceSeq, InFun, Acc) -> enum_docs_since(SupPid, SinceSeq, fwd, InFun, Acc). enum_docs(SupPid, StartId, Direction, InFun, InAcc) -> Db = get_db(db_pid(SupPid)), EnumFun = fun({Id, {UpdateSeq, Rev, SummaryPointer, RevTrees}}, Offset, Acc) -> {ConflictRevs, DeletedConflictRevs} = get_conflict_revs(RevTrees), DocInfo = #doc_info{ id = Id, rev = Rev, update_seq = UpdateSeq, summary_pointer = SummaryPointer, deleted = (summary_ptr_type(SummaryPointer) == deletion), conflict_revs = ConflictRevs, deleted_conflict_revs = DeletedConflictRevs }, InFun(DocInfo, Offset, Acc) end, couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, EnumFun, InAcc). enum_docs(SupPid, StartId, InFun, Ctx) -> enum_docs(SupPid, StartId, fwd, InFun, Ctx). update_stew_group(SupPid, ViewGroupDocId) -> gen_server:call(db_pid(SupPid), {update_stew_group, ViewGroupDocId, fun(_Whatever) -> ok end}). update_view_group(SupPid, ViewGroupDocId) -> gen_server:call(db_pid(SupPid), {update_view_group, ViewGroupDocId, fun(_Whatever) -> ok end}). sync_update_notify(Pid, Ref, partial) -> % We want to wait until complete % so return a fun that calls ourself fun(Status)-> sync_update_notify(Pid, Ref, Status) end; sync_update_notify(Pid, Ref, complete) -> Pid ! {Ref, ok}; sync_update_notify(Pid, Ref, Else) -> Pid ! {Ref, Else}. update_stew_group_sync(SupPid, ViewGroupDocId) -> update_view_group_sync0(SupPid, update_stew_group, ViewGroupDocId). update_view_group_sync(SupPid, ViewGroupDocId) -> update_view_group_sync0(SupPid, update_view_group, ViewGroupDocId). update_temp_view_group_sync(SupPid, MapFunSrc) -> update_view_group_sync0(SupPid, update_temp_view_group, MapFunSrc). update_view_group_sync0(SupPid, Type, Id) -> Pid = self(), Ref = make_ref(), UpdateFun = fun(Status)-> sync_update_notify(Pid, Ref, Status) end, case gen_server:call(db_pid(SupPid), {Type, Id, UpdateFun}, infinity) of ok -> receive {Ref, Result} -> Result end; Else -> Else end. fold_stew(SupPid, ViewGroupDocId, ViewName, Dir, Fun, Acc) -> case gen_server:call(db_pid(SupPid), {get_stew_group, ViewGroupDocId}) of {ok, ViewGroup} -> couch_stew_group:fold(ViewGroup, ViewName, Dir, Fun, Acc); Else -> Else end. fold_stew(SupPid, ViewGroupDocId, ViewName, StartKey, Dir, Fun, Acc) -> case gen_server:call(db_pid(SupPid), {get_stew_group, ViewGroupDocId}) of {ok, ViewGroup} -> couch_stew_group:fold(ViewGroup, ViewName, StartKey, Dir, Fun, Acc); Else -> Else end. fold_view(SupPid, ViewGroupDocId, ViewName, Dir, Fun, Acc) -> case gen_server:call(db_pid(SupPid), {get_view_group, ViewGroupDocId}) of {ok, ViewGroup} -> couch_view_group:fold(ViewGroup, ViewName, Dir, Fun, Acc); Else -> Else end. fold_view(SupPid, ViewGroupDocId, ViewName, StartKey, Dir, Fun, Acc) -> case gen_server:call(db_pid(SupPid), {get_view_group, ViewGroupDocId}) of {ok, ViewGroup} -> couch_view_group:fold(ViewGroup, ViewName, StartKey, Dir, Fun, Acc); Else -> Else end. fold_temp_view(SupPid, Src, Dir, Fun, Acc) -> case gen_server:call(db_pid(SupPid), {get_temp_view_group, Src}) of {ok, ViewGroup} -> couch_view_group:fold(ViewGroup, Src, Dir, Fun, Acc); Else -> Else end. fold_temp_view(SupPid, Src, StartKey, Dir, Fun, Acc) -> case gen_server:call(db_pid(SupPid), {get_temp_view_group, Src}) of {ok, ViewGroup} -> couch_view_group:fold(ViewGroup, Src, StartKey, Dir, Fun, Acc); Else -> Else end. close(SupPid) -> Ref = erlang:monitor(process, SupPid), unlink(SupPid), exit(SupPid, normal), receive {'DOWN', Ref, process, SupPid, _Reason} -> ok end. % server functions init({{DbName, Filepath}, Fd, Supervisor, Options}) -> case lists:member(create, Options) of true -> init_main({DbName, Filepath}, Fd, Supervisor, nil); false -> {ok, Header} = read_header(Filepath, Fd), init_main({DbName, Filepath}, Fd, Supervisor, Header) end. init_main({DbName, Filepath}, Fd, Supervisor, nil) -> % creates a new header and writes it to the file {ok, _} = couch_file:expand(Fd, 2*(?HEADER_SIZE)), Header = #db_header{}, {ok, Header2} = write_header(Fd, Header), ok = couch_file:sync(Fd), init_main({DbName, Filepath}, Fd, Supervisor, Header2); init_main({DbName, Filepath}, Fd, Supervisor, Header) -> {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), ok = couch_stream:set_min_buffer(SummaryStream, 10000), {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd), {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), {ok, ViewGroupBtree} = couch_btree:open(Header#db_header.view_group_btree_state, Fd), {ok, StewGroupBtree} = couch_btree:open(Header#db_header.stew_group_btree_state, Fd), Db = #db{ fd=Fd, supervisor=Supervisor, header=Header, summary_stream = SummaryStream, docinfo_by_Id_btree = IdBtree, docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, last_update_seq = Header#db_header.last_update_seq, view_group_btree = ViewGroupBtree, stew_group_btree = StewGroupBtree, doc_count = Header#db_header.doc_count, name = DbName }, UpdatePid = spawn_link(couch_db, update_loop, [self(), Db]), Pid=self(), GetStewGroupInfoFun = fun(GroupKey) -> get_stew_group_info(get_db(Pid), GroupKey) end, GetViewGroupInfoFun = fun(GroupKey) -> get_view_group_info(get_db(Pid), GroupKey) end, GetTempViewGroupInfoFun = fun(GroupKey) -> % for temp views, the groupkey is the source. and we never persist info Type = lists:takewhile(fun($|) -> false; (_) -> true end, GroupKey), [$| | Function] = lists:dropwhile(fun($|) -> false; (_) -> true end, GroupKey), {ok, {{Type, [{GroupKey, Function}]}, nil}} end, UpdateStewGroupInfoFun = fun(GroupKey, UpdateStatus, GroupInfo) -> % send the updated stew group info to the update process UpdatePid ! {stew_group_updated, GroupKey, UpdateStatus, GroupInfo}, ok end, UpdateViewGroupInfoFun = fun(GroupKey, UpdateStatus, GroupInfo) -> % send the updated view group info to the update process UpdatePid ! {view_group_updated, GroupKey, UpdateStatus, GroupInfo}, ok end, UpdateTempViewGroupInfoFun = fun(_GroupKey, _UpdateStatus, _GroupInfo) -> ok % do nothing end, {ok, TempFd} = couch_file:open(Filepath ++ ".temp", [create,overwrite]), {ok, ViewMgr} = couch_view_group:start_manager(Supervisor, Fd, GetViewGroupInfoFun, UpdateViewGroupInfoFun), {ok, TempViewMgr} = couch_view_group:start_manager(Supervisor, TempFd, GetTempViewGroupInfoFun, UpdateTempViewGroupInfoFun), {ok, StewMgr} = couch_stew_group:start_manager(Supervisor, Fd, GetStewGroupInfoFun, UpdateStewGroupInfoFun), UpdatePid ! {set_view_group_mgr, ViewMgr}, UpdatePid ! {set_stew_group_mgr, StewMgr}, {ok, #main{db=Db, update_pid=UpdatePid, view_group_mgr=ViewMgr, temp_view_group_mgr=TempViewMgr, stew_group_mgr=StewMgr}}. terminate(_Reason, #main{db=Db} = Main) -> Main#main.update_pid ! close, couch_view_group:stop(Main#main.view_group_mgr), couch_view_group:stop(Main#main.temp_view_group_mgr), couch_stew_group:stop(Main#main.stew_group_mgr), couch_file:close(Db#db.fd). handle_call({get_stew_group, ViewGroupDocId}, From, #main{db=Db}=Main) -> case get_doc_info(Db, ViewGroupDocId) of {ok, #doc_info{deleted=true}} -> {reply, {not_found, deleted}, Main}; {ok, DocInfo} -> ok = couch_stew_group:get_group_async(Main#main.stew_group_mgr, DocInfo, From), {noreply, Main}; not_found -> {reply, {not_found, missing}, Main} end; handle_call({get_view_group, ViewGroupDocId}, From, #main{db=Db}=Main) -> case get_doc_info(Db, ViewGroupDocId) of {ok, #doc_info{deleted=true}} -> {reply, {not_found, deleted}, Main}; {ok, DocInfo} -> ok = couch_view_group:get_group_async(Main#main.view_group_mgr, DocInfo, From), {noreply, Main}; not_found -> {reply, {not_found, missing}, Main} end; handle_call({get_temp_view_group, MapFunSrc}, From, Main) -> ok = couch_view_group:get_group_async(Main#main.temp_view_group_mgr, MapFunSrc, From), {noreply, Main}; handle_call({update_docs, DocActions, Options}, From, Main) -> Main#main.update_pid ! {From, update_docs, DocActions, Options}, {noreply, Main}; handle_call(get_db, _From, #main{db=Db}=Main) -> {reply, {ok, Db}, Main}; handle_call({update_stew_group, Id, UpdateNotifFun}, _From, #main{db=Db}=Main) -> case get_doc_info(Db, Id) of {ok, DocInfo} -> ok = couch_stew_group:update_group(Main#main.stew_group_mgr, DocInfo, UpdateNotifFun), {reply, ok, Main}; Error -> {reply, Error, Main} end; handle_call({update_view_group, Id, UpdateNotifFun}, _From, #main{db=Db}=Main) -> case get_doc_info(Db, Id) of {ok, DocInfo} -> ok = couch_view_group:update_group(Main#main.view_group_mgr, DocInfo, UpdateNotifFun), {reply, ok, Main}; Error -> {reply, Error, Main} end; handle_call({update_temp_view_group, Src, UpdateNotifFun}, _From, Main) -> {reply, couch_view_group:update_group(Main#main.temp_view_group_mgr, Src, UpdateNotifFun), Main}; handle_call({db_updated, NewDb}, _From, Main) -> {reply, ok, Main#main{db=NewDb}}. handle_cast(foo, Main) -> {noreply, Main}. %%% Internal function %%% db_pid(SupPid)-> {error, {already_started, DbPid}} = supervisor:start_child(SupPid, {couch_db, {couch_db, sup_start_link, []}, permanent, brutal_kill, worker, [couch_db]}), DbPid. update_loop(MainPid, Db) -> receive {set_view_group_mgr, ViewMgr} -> update_loop(MainPid, Db#db{view_group_mgr=ViewMgr}); {set_stew_group_mgr, StewMgr} -> update_loop(MainPid, Db#db{stew_group_mgr=StewMgr}); {OrigFrom, update_docs, DocActions, Options} -> {ok, DocResults, Db2} = update_docs_int(Db, DocActions, Options), ok = gen_server:call(MainPid, {db_updated, Db2}), couch_db_update_notifier:notify_all(Db2#db.name), gen_server:reply(OrigFrom, {ok, DocResults}), update_loop(MainPid, Db2); {view_group_updated, #doc_info{id=Id}=GroupDocInfo, _UpdateStatus, ViewGroupInfo} -> case get_doc_info(Db, GroupDocInfo#doc_info.id) of {ok, GroupDocInfo} -> % rev on disk matches the rev of the view group being updated % so we save the info to disk {ok, GroupBtree2} = couch_btree:add_remove(Db#db.view_group_btree, [{Id, ViewGroupInfo}], []), Db2 = Db#db{view_group_btree=GroupBtree2, uncommitted_writes=true}, {ok, Db3} = commit_outstanding(Db2), ok = gen_server:call(MainPid, {db_updated, Db3}), update_loop(MainPid, Db3); _Else -> % doesn't match, don't save in btree update_loop(MainPid, Db) end; {stew_group_updated, #doc_info{id=Id}=GroupDocInfo, _updateStatus, ViewGroupInfo} -> case get_doc_info(Db, GroupDocInfo#doc_info.id) of {ok, GroupDocInfo} -> % rev on disk matches the rev of the view group being updated % so we save the info to disk {ok, GroupBtree2} = couch_btree:add_remove(Db#db.stew_group_btree, [{Id, ViewGroupInfo}], []), Db2 = Db#db{stew_group_btree=GroupBtree2, uncommitted_writes=true}, {ok, Db3} = commit_outstanding(Db2), ok = gen_server:call(MainPid, {db_updated, Db3}), update_loop(MainPid, Db3); _Else -> % doesn't match, don't save in btree update_loop(MainPid, Db) end; close -> % terminate loop ok end. get_stew_group_info(#db{}=Db, #doc_info{id=Id}=DocInfo) -> case couch_btree:lookup_single(Db#db.stew_group_btree, Id) of {ok, {ViewQueries, ViewGroupState}} -> {ok, {ViewQueries, ViewGroupState}}; not_found -> case open_doc_int(Db, DocInfo, []) of {ok, Doc} -> case couch_doc:get_view_functions(Doc) of none -> {not_found, no_views_found}; Queries -> {ok, {Queries, nil}} end; Else -> Else end end. get_view_group_info(#db{}=Db, #doc_info{id=Id}=DocInfo) -> case couch_btree:lookup_single(Db#db.view_group_btree, Id) of {ok, {ViewQueries, ViewGroupState}} -> {ok, {ViewQueries, ViewGroupState}}; not_found -> case open_doc_int(Db, DocInfo, []) of {ok, Doc} -> case couch_doc:get_view_functions(Doc) of none -> {not_found, no_views_found}; Queries -> {ok, {Queries, nil}} end; Else -> Else end end. get_db(DbPid) -> {ok, Db} = gen_server:call(DbPid, get_db), Db. open_doc_revs_int(Db, Id, Revs, Options) -> case get_full_doc_info(Db, Id) of {ok, {_DocInfo, RevTree}} -> {FoundRevs, MissingRevs} = case Revs of all -> {get_all_leafs(RevTree, []), []}; _ -> case lists:member(latest, Options) of true -> get_rev_leafs(RevTree, Revs, []); false -> get_revs(RevTree, Revs, []) end end, FoundResults = lists:map(fun({Rev, FoundSummaryPtr, FoundRevPath}) -> case summary_ptr_type(FoundSummaryPtr) of missing -> % we have the rev in our list but know nothing about it {{not_found, missing}, Rev}; deletion -> Doc = make_doc(Db, Id, FoundSummaryPtr, FoundRevPath), {ok, Doc}; disk -> Doc = make_doc(Db, Id, FoundSummaryPtr, FoundRevPath), {ok, Doc} end end, FoundRevs), Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], {ok, Results}; not_found when Revs == all -> {ok, []}; not_found -> {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} end. open_doc_int(Db, ?NON_REP_DOC_PREFIX ++ Id, _Options) -> case couch_btree:lookup_single(Db#db.local_docs_btree, Id) of {ok, {Rev, BodyData}} -> {ok, #doc{id=?NON_REP_DOC_PREFIX ++ Id, revs=[Rev], body=BodyData}}; not_found -> {not_found, missing} end; open_doc_int(Db, #doc_info{rev=Rev,summary_pointer=Sp}=DocInfo, Options) -> open_doc_int(Db, {DocInfo, [{Rev, Sp, []}]}, Options); open_doc_int(Db, {#doc_info{id=Id,rev=Rev,summary_pointer=Sp,deleted=Deleted}=DocInfo, RevTree}, Options) -> case (not Deleted) orelse lists:member(allow_stub, Options) of true -> {[{_,_, Revs}], []} = get_revs(RevTree, [Rev], []), Doc = make_doc(Db, Id, Sp, Revs), {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; false -> {not_found, deleted} end; open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of {ok, {DocInfo, RevisionTree}} -> open_doc_int(Db, {DocInfo, RevisionTree}, Options); not_found -> {not_found, missing} end. doc_meta_info(DocInfo, RevTree, Options) -> case lists:member(revs_info, Options) of false -> []; true -> {[RevPath],[]}=get_full_rev_paths(RevTree, [DocInfo#doc_info.rev], []), [{revs_info, [{Rev0, summary_ptr_type(Sp0)} || {Rev0, Sp0} <- RevPath]}] end ++ case lists:member(conflicts, Options) of false -> []; true -> case DocInfo#doc_info.conflict_revs of [] -> []; _ -> [{conflicts, DocInfo#doc_info.conflict_revs}] end end ++ case lists:member(deleted_conflicts, Options) of false -> []; true -> case DocInfo#doc_info.deleted_conflict_revs of [] -> []; _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}] end end. % rev tree functions merge_rev_trees([], B) -> B; merge_rev_trees(A, []) -> A; merge_rev_trees([ATree | ANextTree], [BTree | BNextTree]) -> {ARev, ADoc, ASubTrees} = ATree, {BRev, _BDoc, BSubTrees} = BTree, if ARev == BRev -> %same rev MergedSubTrees = merge_rev_trees(ASubTrees, BSubTrees), MergedNextTrees = merge_rev_trees(ANextTree, BNextTree), [{ARev, ADoc, MergedSubTrees} | MergedNextTrees]; ARev < BRev -> [ATree | merge_rev_trees(ANextTree, [BTree | BNextTree])]; true -> [BTree | merge_rev_trees([ATree | ANextTree], BNextTree)] end. find_missing_revs([], _Trees) -> []; find_missing_revs(SrcRevs, []) -> SrcRevs; find_missing_revs(SrcRevs, [{RevId, _, SubTrees} | RestTrees]) -> SrcRevs2 = lists:delete(RevId, SrcRevs), SrcRevs3 = find_missing_revs(SrcRevs2, SubTrees), find_missing_revs(SrcRevs3, RestTrees). % get the latest leaf revs for the found rev. % Often these are the same rev. get_rev_leafs(_Trees, [], _RevPathAcc) -> {[], []}; get_rev_leafs([], RevsToGet, _RevPathAcc) -> {[], RevsToGet}; get_rev_leafs([{RevId, _SummaryPtr, SubTrees}=Tree | RestTrees], RevsToGet, RevPathAcc) -> case lists:member(RevId, RevsToGet) of true -> % found it LeafsFound = get_all_leafs([Tree], RevPathAcc), LeafRevsFound = [LeafRevFound || {LeafRevFound, _, _} <- LeafsFound], RevsToGet2 = RevsToGet -- LeafRevsFound, {RestLeafsFound, RevsRemaining} = get_rev_leafs(RestTrees, RevsToGet2, RevPathAcc), {LeafsFound ++ RestLeafsFound, RevsRemaining}; false -> {LeafsFound, RevsToGet2} = get_rev_leafs(SubTrees, RevsToGet, [RevId | RevPathAcc]), {RestLeafsFound, RevsRemaining} = get_rev_leafs(RestTrees, RevsToGet2, RevPathAcc), {LeafsFound ++ RestLeafsFound, RevsRemaining} end. get_revs(Tree, RevsToGet, RevPathAcc) -> {RevPaths, RevsNotFound} = get_full_rev_paths(Tree, RevsToGet, RevPathAcc), FixedResults = [ {Rev, Pointer, [Rev0 || {Rev0, _} <- Path]} || [{Rev, Pointer}|_] = Path <- RevPaths], {FixedResults, RevsNotFound}. get_full_rev_paths([], RevsToGet, _RevPathAcc) -> {[], RevsToGet}; get_full_rev_paths([{RevId, SummaryPtr, SubTrees} | RestTrees], RevsToGet, RevPathAcc) -> RevsToGet2 = RevsToGet -- [RevId], CurrentNodeResult = case RevsToGet2 == RevsToGet of true -> % not in the rev list. []; false -> % this node is the rev list. return it [[{RevId, SummaryPtr} | RevPathAcc]] end, {RevsGotten, RevsRemaining} = get_full_rev_paths(SubTrees, RevsToGet2, [{RevId, SummaryPtr} | RevPathAcc]), {RevsGotten2, RevsRemaining2} = get_full_rev_paths(RestTrees, RevsRemaining, RevPathAcc), {CurrentNodeResult ++ RevsGotten ++ RevsGotten2, RevsRemaining2}. get_all_leafs([], _RevPathAcc) -> []; get_all_leafs([{RevId, SummaryPtr, []} | RestTrees], RevPathAcc) -> [{RevId, SummaryPtr, [RevId | RevPathAcc]} | get_all_leafs(RestTrees, RevPathAcc)]; get_all_leafs([{RevId, _SummaryPtr, SubTrees} | RestTrees], RevPathAcc) -> get_all_leafs(SubTrees, [RevId | RevPathAcc]) ++ get_all_leafs(RestTrees, RevPathAcc). rev_list_to_trees(Doc, RevIds) -> rev_list_to_trees2(Doc, lists:reverse(RevIds)). rev_list_to_trees2(Doc, [RevId]) -> [{RevId, Doc, []}]; rev_list_to_trees2(Doc, [RevId | Rest]) -> [{RevId, type_to_summary_ptr(missing), rev_list_to_trees2(Doc, Rest)}] . winning_rev(Trees) -> LeafRevs = get_all_leafs(Trees, []), SortedLeafRevs = lists:sort(fun({RevIdA, SummaryPointerA, PathA}, {RevIdB, SummaryPointerB, PathB}) -> % sort descending by {not deleted, then Depth, then RevisionId} ANotDeleted = summary_ptr_type(SummaryPointerA) /= deletion, BNotDeleted = summary_ptr_type(SummaryPointerB) /= deletion, A = {ANotDeleted, length(PathA), RevIdA}, B = {BNotDeleted, length(PathB), RevIdB}, A > B end, LeafRevs), [{RevId, SummaryPointer, Path} | Rest] = SortedLeafRevs, {ConflictRevTuples, DeletedConflictRevTuples} = lists:splitwith(fun({_ConflictRevId, SummaryPointer1, _}) -> summary_ptr_type(SummaryPointer1) /= deletion end, Rest), ConflictRevs = [RevId1 || {RevId1, _, _} <- ConflictRevTuples], DeletedConflictRevs = [RevId2 || {RevId2, _, _} <- DeletedConflictRevTuples], {RevId, SummaryPointer, Path, ConflictRevs, DeletedConflictRevs}. get_conflict_revs([]) -> {[], []}; get_conflict_revs(Trees) -> {_, _, _, ConflictRevs, DeletedConflictRevs} = winning_rev(Trees), {ConflictRevs, DeletedConflictRevs}. % Flushes to disk any outstanding revs (document records where summary pointers should be) % and replaces the documents with their SummaryPointers in the returned trees. flush_rev_trees(_Db, []) -> []; flush_rev_trees(Db, [{RevId, #doc{deleted=true}, SubTrees} | RestTrees]) -> [{RevId, type_to_summary_ptr(deletion), flush_rev_trees(Db, SubTrees)} | flush_rev_trees(Db, RestTrees)]; flush_rev_trees(Db, [{RevId, #doc{}=Doc, SubTrees} | RestTrees]) -> % all bins must be flushed stream pointers with the same Fd as this db Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Doc#doc.attachments], {ok, SummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), [{RevId, SummaryPointer, flush_rev_trees(Db, SubTrees)} | flush_rev_trees(Db, RestTrees)]; flush_rev_trees(Db, [{RevId, SummaryPointer, SubTrees} | RestTrees]) -> [{RevId, SummaryPointer, flush_rev_trees(Db, SubTrees)} | flush_rev_trees(Db, RestTrees)]. make_doc(Db, Id, SummaryPointer, RevisionPath) -> {BodyData, BinValues} = case summary_ptr_type(SummaryPointer) == disk of true -> {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer), {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]}; false -> {[], []} end, #doc{ id = Id, revs = RevisionPath, body = BodyData, attachments = BinValues, deleted = (summary_ptr_type(SummaryPointer) == deletion) }. type_to_summary_ptr(missing) -> 0; type_to_summary_ptr(deletion) -> 1. summary_ptr_type(0) -> missing; summary_ptr_type(1) -> deletion; summary_ptr_type(_Pointer) -> disk. write_summaries(Db, [], InfoBySeqOut, RemoveSeqOut, InfoByIdOut, DocResultOut) -> {ok, lists:reverse(DocResultOut), lists:reverse(InfoByIdOut), lists:reverse(RemoveSeqOut), lists:reverse(InfoBySeqOut), Db}; write_summaries(Db, [{Id, {Docs, Options}, {DiskUpdateSeq, _DiskRevision, DiskSummaryPointer, DiskRevTrees}} | Rest], InfoBySeqOut, RemoveSeqOut, InfoByIdOut, DocResultOut) -> NewRevs = case lists:member(new_edits, Options) of true -> [#doc{revs=DocRevs}=Doc]=Docs, case DocRevs of [] when DiskRevTrees == [] -> % sucecess new document creation, no exisiting doc NewRev = integer_to_list(couch_util:rand32()), {rev_list_to_trees(Doc, [NewRev]), [NewRev]}; [] -> % new document creation, exisiting doc {_, WinSummaryPtr, WinPath, _, _} = winning_rev(DiskRevTrees), case summary_ptr_type(WinSummaryPtr) of deletion -> % success. if a new document on top of deletion NewRev = integer_to_list(couch_util:rand32()), NewTrees = rev_list_to_trees(Doc, [NewRev | WinPath]), {merge_rev_trees(DiskRevTrees, NewTrees), [NewRev]}; _ -> % conflict conflict end; [BasedOnRev|_] -> Leafs = get_all_leafs(DiskRevTrees, []), case lists:keysearch(BasedOnRev, 1, Leafs) of false -> % Can't find the based on rev in the ant leaf rev. Consider that % a conflict. conflict; {value, {_, _, BasedOnRevs}} -> % found the rev we are looking for! Create a new rev. NewRev = integer_to_list(couch_util:rand32()), NewTrees = rev_list_to_trees(Doc, [NewRev | BasedOnRevs]), {merge_rev_trees(DiskRevTrees, NewTrees), [NewRev]} end end; false -> % not new edits, merge the revisions into the tree {InputRevTrees, OutputRevs0} = lists:foldl(fun(#doc{revs=Revisions}=Doc, {AccTrees, AccRevs}) -> DocRevTree = rev_list_to_trees(Doc, Revisions), NewRevTrees = merge_rev_trees(AccTrees, DocRevTree), {NewRevTrees, [lists:nth(1, Revisions) | AccRevs]} end, {[], []}, Docs), {merge_rev_trees(DiskRevTrees, InputRevTrees), OutputRevs0} end, case NewRevs of conflict -> {WinningRevision, _, _, _, _} = winning_rev(DiskRevTrees), DocResultOut2 = [[{conflict, WinningRevision} || _Doc <- Docs] | DocResultOut], write_summaries(Db, Rest, InfoBySeqOut, RemoveSeqOut, InfoByIdOut, DocResultOut2); {NewRevTrees, OutputRevs} -> FlushedTrees = flush_rev_trees(Db, NewRevTrees), {WinningRevision, WinningSummaryPointer, _, ConflictRevs, DeletedConflictRevs} = winning_rev(FlushedTrees), OldDiskDocuments = case summary_ptr_type(DiskSummaryPointer) == disk of true -> 1; false -> 0 end, NewDiskDocuments = case summary_ptr_type(WinningSummaryPointer) == disk of true -> 1; false -> 0 end, NewDocCount = Db#db.doc_count + NewDiskDocuments - OldDiskDocuments, UpdateSeq = Db#db.last_update_seq + 1, RemoveSeqOut2 = case DiskUpdateSeq of 0 -> RemoveSeqOut; _ -> [DiskUpdateSeq | RemoveSeqOut] end, InfoBySeqOut2 = [{UpdateSeq, {Id, WinningRevision, WinningSummaryPointer, ConflictRevs, DeletedConflictRevs}} | InfoBySeqOut], InfoByIdOut2 = [{Id, {UpdateSeq, WinningRevision, WinningSummaryPointer, FlushedTrees}} | InfoByIdOut], % output an ok and the revid for each successful save DocResultOut2 = [[{ok, OutputRev} || OutputRev <- OutputRevs] | DocResultOut], Db2 = Db#db{last_update_seq = UpdateSeq, uncommitted_writes=true, doc_count=NewDocCount}, write_summaries(Db2, Rest, InfoBySeqOut2, RemoveSeqOut2, InfoByIdOut2, DocResultOut2) end. update_docs_int(Db, DocsOptionsList, Options) -> #db{ docinfo_by_Id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, view_group_btree = ViewGroupBTree, view_group_mgr = ViewGroupMgr, stew_group_btree = StewGroupBTree, stew_group_mgr = StewGroupMgr } = Db, % seperate out the NonRep documents from the rest of the documents {Ids, DocsOptionsList2, NonRepDocs} = lists:foldl(fun({[#doc{id=Id}=Doc | Rest], _Options}=DocOptions, {IdsAcc, DocsOptionsAcc, NonRepDocsAcc}) -> case Id of ?NON_REP_DOC_PREFIX ++ NRDocId when Rest==[] -> % when saving NR (non rep) documents, you can only save a single rev {IdsAcc, DocsOptionsAcc, [Doc#doc{id=NRDocId} | NonRepDocsAcc]}; Id-> {[Id | IdsAcc], [DocOptions | DocsOptionsAcc], NonRepDocsAcc} end end, {[], [], []}, DocsOptionsList), {ok, OldDocInfoResults} = couch_btree:lookup(DocInfoByIdBTree, Ids), % create a list of {{Docs, UpdateOptions}, RevisionTree} tuples. DocsAndOldDocInfo = lists:zipwith3(fun(DocId, DocsOptions, OldDocInfoLookupResult) -> case OldDocInfoLookupResult of {ok, {_Id, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskRevTrees}}} -> {DocId, DocsOptions, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskRevTrees}}; {not_found, _} -> {DocId, DocsOptions, {"", 0, 0, []}} end end, Ids, DocsOptionsList2, OldDocInfoResults), % now write out the documents {ok, DocResults, InfoById, RemoveSeqList, InfoBySeqList, Db2} = write_summaries(Db, DocsAndOldDocInfo, [], [], [], []), % and the indexes to the documents {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeqList, RemoveSeqList), {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), % do the nr documents {ok, NRUpdateResults, Db3} = update_nr_docs(Db2, NonRepDocs), % clear the computed view cache on disk UpdatedIds = [UpdatedId || {UpdatedId, _DocInfo} <- InfoById], {ok, ViewGroupBTree2} = couch_btree:add_remove(ViewGroupBTree, [], UpdatedIds), {ok, StewGroupBTree2} = couch_btree:add_remove(StewGroupBTree, [], UpdatedIds), % now notify the view group manager to discard any of the view groups it has in memory OldDocInfos = lists:map( fun({OldId, _Docs, {OldUpdateSeq, OldRev, OldSummaryPointer, OldRevTrees}}) -> {ConflictRevs, DeletedConflictRevs} = get_conflict_revs(OldRevTrees), #doc_info{id=OldId, update_seq=OldUpdateSeq, rev=OldRev, summary_pointer=OldSummaryPointer, conflict_revs=ConflictRevs, deleted_conflict_revs=DeletedConflictRevs} end, DocsAndOldDocInfo), ok = couch_view_group:free_groups(ViewGroupMgr, OldDocInfos), ok = couch_stew_group:free_groups(StewGroupMgr, OldDocInfos), Db4 = Db3#db{ docinfo_by_Id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, view_group_btree = ViewGroupBTree2, stew_group_btree = StewGroupBTree2, uncommitted_writes = true }, case lists:member(delay_commit, Options) of true -> Db5 = Db4; false -> {ok, Db5} = commit_outstanding(Db4) end, {ok, DocResults ++ NRUpdateResults, Db5}. update_nr_docs(#db{local_docs_btree = LocalDocsBtree}= Db, NRDocs) -> {ok, NROldRevSummaries} = couch_btree:lookup(LocalDocsBtree, [Id || #doc{id=Id} <- NRDocs]), NRResults = lists:zipwith( fun(#doc{deleted=Del, revs=Revs}=NRDoc, OldRevSummary) -> NewRev = case Revs of [Rev|_] -> Rev; [] -> "" end, case {Del, NewRev, OldRevSummary} of {_, _, {ok, {_,{OldRev, _}}}} when NewRev == OldRev-> {NRDoc, {ok, integer_to_list(couch_util:rand32())}}; {_, _, {ok, {_,{OldRev, _}}}} -> {NRDoc, {conflict, OldRev}}; {false, "", {not_found, _}} -> {NRDoc, {ok, integer_to_list(couch_util:rand32())}}; {true, "", _} -> % cannot delete an unspecified rev {NRDoc, {conflict, ""}} end end, NRDocs, NROldRevSummaries), NRIdsSummaries = [{Id, {Rev, NRBody}} || {#doc{id=Id, body=NRBody, deleted=false}, {ok, Rev}} <- NRResults], NRIdsDelete = [Id || {#doc{id=Id, deleted=true}, {ok, _}} <- NRResults], {ok, LocalDocsBtree2} = couch_btree:add_remove(LocalDocsBtree, NRIdsSummaries, NRIdsDelete), NRUpdateResults = [[Result] || {_Doc, Result} <- NRResults], {ok, NRUpdateResults, Db#db{local_docs_btree = LocalDocsBtree2}}. commit_outstanding(#db{fd=Fd, uncommitted_writes=true, header=Header} = Db) -> ok = couch_file:sync(Fd), % commit outstanding data Header2 = Header#db_header{ last_update_seq = Db#db.last_update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree), view_group_btree_state = couch_btree:get_state(Db#db.view_group_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), doc_count = Db#db.doc_count }, {ok, Header3} = write_header(Fd, Header2), ok = couch_file:sync(Fd), % commit header to disk Db2 = Db#db{ uncommitted_writes = false, header = Header3 }, {ok, Db2}; commit_outstanding(Db) -> {ok, Db}. write_header(Fd, Header) -> H2 = Header#db_header{write_version = Header#db_header.write_version + 1}, % The leading bytes in every db file, the sig and the file version: HeaderPrefix = ?DB_FILE_SIG, FileVersion = <<(?FILE_VERSION):16>>, %the actual header data TermBin = term_to_binary(H2), % the size of all the bytes written to the header, including the md5 signature (16 bytes) FilledSize = size(HeaderPrefix) + size(FileVersion) + size(TermBin) + 16, case FilledSize > ?HEADER_SIZE of true -> % too big! {error, error_header_too_large}; false -> % pad out the header with zeros, then take the md5 hash PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>, Sig = erlang:md5([TermBin, PadZeros]), % now we assemble the final header binary and write to disk WriteBin = <>, ?HEADER_SIZE = size(WriteBin), % sanity check DblWriteBin = [WriteBin, WriteBin], ok = couch_file:pwrite(Fd, 0, DblWriteBin), {ok, H2} end. read_header(FilePath, Fd) -> {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)), <> = Bin, % read the first header case extract_header(Bin1) of {ok, Header1} -> case extract_header(Bin2) of {ok, Header2} -> case Header1 == Header2 of true -> % Everything is completely normal! {ok, Header1}; false -> % To get here we must have two different header versions with signatures intact. % It's weird but possible (a commit failure right at the 2k boundary). Log it. couch_log:info("Header version differences on database open (~s).~nPrimary Header: ~p~nSecondary Header: ~p", [FilePath, Header1, Header2]), case Header1#db_header.write_version > Header2#db_header.write_version of true -> {ok, Header1}; false -> {ok, Header2} end end; {error, Error} -> % error reading second header. It's ok, but log it. couch_log:info("Secondary header corruption on database open (~s)(error: ~p). Using primary header instead.", [FilePath, Error]), {ok, Header1} end; {error, Error} -> % error reading primary header case extract_header(Bin2) of {ok, Header2} -> % log corrupt primary header. It's ok since the secondary is still good. couch_log:info("Primary header corruption on database open (~s)(error: ~p). Using secondary header instead.", [FilePath, Error]), {ok, Header2}; _ -> % error reading secondary header too % return the error, no need to log anything as the caller will be responsible for dealing with the error. {error, Error} end end. extract_header(Bin) -> SizeOfPrefix = size(?DB_FILE_SIG), SizeOfTermBin = ?HEADER_SIZE - SizeOfPrefix - 2 - % file version 16, % md5 sig <> = Bin, % check the header prefix case HeaderPrefix of ?DB_FILE_SIG -> % check the file version case FileVersion of ?FILE_VERSION -> % check the integrity signature case erlang:md5(TermBin) == Sig of true -> Header = binary_to_term(TermBin), #db_header{} = Header, % make sure we decoded to the right record type {ok, Header}; false -> {error, header_corrupt} end; _ -> {error, {incompatible_file_version, FileVersion}} end; _ -> {error, unknown_file_type} end. code_change(_OldVsn, State, _Extra) -> {ok, State}. handle_info(_Info, State) -> {noreply, State}.