diff --git a/build.clj b/build.clj index 9d79393cc..d30d182cf 100644 --- a/build.clj +++ b/build.clj @@ -1,6 +1,8 @@ (ns build (:refer-clojure :exclude [compile]) (:require [clojure.edn :as edn] + [clojure.java.io :as io] + [clojure.string :as str] [clojure.tools.build.api :as b])) (def class-dir "target/classes") @@ -15,16 +17,31 @@ "-Xlint:deprecation"]})) (defn javadoc - "Generate Javadoc for the Java API. - Output will be in target/javadoc and automatically included in the jar." + "Generate Javadoc for the Java API into target/javadoc. + tools.build has no javadoc wrapper (there is no `b/javadoc`), so shell out to the JDK + `javadoc` tool via b/process, passing the project classpath so the Java API's imports + (clojure.lang.*, generated classes) resolve. Output is included in the jar at release." [_] - (b/javadoc {:src-dirs ["java/src"] - :output-dir "target/javadoc" - :javadoc-opts ["-public" - "-Xdoclint:none" - "-windowtitle" "Datahike Java API" - "-doctitle" "Datahike Java API Documentation" - "-link" "https://docs.oracle.com/javase/8/docs/api/" - "-link" "https://clojure.github.io/clojure/"]}) - (println "Javadoc generated in target/javadoc") - (println "Javadoc will be automatically published to javadoc.io when released to Clojars")) + (let [out "target/javadoc" + cp (str/join java.io.File/pathSeparator (:classpath-roots basis)) + srcs (->> (io/file "java/src") + file-seq + (filter #(and (.isFile ^java.io.File %) + (str/ends-with? (.getName ^java.io.File %) ".java"))) + (mapv #(.getPath ^java.io.File %))) + args (into ["javadoc" "-d" out "-classpath" cp + "-public" "-Xdoclint:none" + "-windowtitle" "Datahike Java API" + "-doctitle" "Datahike Java API Documentation" + "-link" "https://docs.oracle.com/javase/8/docs/api/" + "-link" "https://clojure.github.io/clojure/"] + srcs) + {:keys [exit]} (b/process {:command-args args})] + ;; javadoc returns non-zero on warnings (the Java API has undocumented elements), which is + ;; non-fatal — the HTML is still produced. Only treat it as a failure if no output appeared. + (when-not (.exists (io/file out "index.html")) + (throw (ex-info "javadoc produced no output" {:exit exit}))) + (when-not (zero? exit) + (println "Note: javadoc exited" exit "(warnings above); docs generated regardless.")) + (println "Javadoc generated in" out) + (println "Javadoc will be automatically published to javadoc.io when released to Clojars"))) diff --git a/deps.edn b/deps.edn index 6de3fff17..aaab49fe1 100644 --- a/deps.edn +++ b/deps.edn @@ -1,14 +1,15 @@ {:deps {org.clojure/clojure {:mvn/version "1.12.4"} org.replikativ/hasch {:mvn/version "0.4.98" :exclusions [org.clojure/clojurescript]} - org.replikativ/konserve {:mvn/version "0.9.346" + org.replikativ/konserve {:mvn/version "0.9.349" ;; includes cljs header meta-size cross-host fix (#143) :exclusions [org.clojure/clojurescript org.clojars.mmb90/cljs-cache]} org.replikativ/superv.async {:mvn/version "0.3.50" :exclusions [org.clojure/clojurescript]} org.replikativ/datalog-parser {:mvn/version "0.2.37"} - org.replikativ/persistent-sorted-set {:mvn/version "0.4.122"} + org.replikativ/persistent-sorted-set {:git/url "https://github.com/replikativ/persistent-sorted-set.git" + :git/sha "2063823a6fa78dcda5570906d9e7509b0394ba68"} ;; diff-buf (feature/op-buf-v5); run `clojure -X:deps prep` to compile its Java environ/environ {:mvn/version "1.2.0"} nrepl/bencode {:mvn/version "1.2.0"} org.replikativ/logging {:mvn/version "0.1.3"} diff --git a/doc/index-root-fusion.md b/doc/index-root-fusion.md new file mode 100644 index 000000000..4f57df0ad --- /dev/null +++ b/doc/index-root-fusion.md @@ -0,0 +1,95 @@ +# Index-root fusion (reduce write amplification) + +*Branch: `feat/fuse-index-roots`. Status: design, pre-implementation.* + +## Problem + +A datahike commit writes `(count pending-writes)` index-node objects + 2 +db-records (under the commit-id and under the branch). Measured ~7 PUTs/commit +for small commits. The index-node objects include each index's **root**, which +changes essentially every commit. On per-request object storage this +amplification is the dominant cost (see saas `doc/cost-model.md`). + +## How the write path works today + +- `db->stored` (`writing.cljc`) calls `di/-flush` on each index → `psset/store` + walks dirty nodes and calls `CachedStorage.store` per node, which **appends + `[address node]` to `pending-writes`** and returns the (content- or squuid-) + address. The root's address becomes `pset._address`. +- The stored-db map references each index as a small record. The PSS konserve + write-handler serializes a `PersistentSortedSet` to `{:meta, :address, + :count}`; the **root node lives separately** at `:address`. Read-handler: + `(PersistentSortedSet. meta cmp address @storage nil count settings 0)` — the + 5th arg (currently `nil`) is the in-memory `_root`. +- `commit!` drains `pending-writes` (`k/assoc store address node`, one PUT + each), then writes the db-record under `cid` and under `branch`. + +## The fusion seam + +Inline each index's **root node** into its db-record reference +(`{:meta, :address, :count, :root }`) and **drop the root from +`pending-writes`** so it isn't PUT separately. Restore passes the inlined node +as the constructor's 5th arg instead of `nil` — deeper children stay lazy. + +Win profile (sharper than "−3 PUTs"): +- **Index = single leaf root (tiny tenant):** the *whole* index inlines → zero + separate node PUTs for it. A few-datom tenant's commit collapses to ~2 + record PUTs. +- **Deeper tree:** saves exactly **1 PUT per index** (the root); the dirty + leaf/intermediate path is still separate — that part is op-buf's job, later. +Also **−1 GET per index on cold open** (root arrives with the record). + +## Options + +- **A — explicit fused index-ref in `db->stored`/`stored->db`** *(recommended)*. + Build `{:meta :address :count :root }`, remove the root from + `pending-writes`, reconstruct via the root-seeding constructor. Contained to + `writing.cljc` + a small helper. Opt-in via config `:fuse-index-roots?` so + it's measurable against baseline. +- **B — embed root in the PSS konserve write/read handler.** More automatic but + the handler would need storage access at serialize time + a way to skip the + separate write. Couples handler to pending state. Messier. +- **C — fusion + branch-as-pointer.** On top of A: write the fused object once + under `cid`, a tiny `{:head cid}` under `branch`. Halves per-commit record + bytes; costs a 2nd GET on branch-open. Optional follow-on. +- **D — inline the whole dirty path (op-buf / mini-WAL in the record).** The + deeper convergence; this is the PSS op-buf work, explicitly *after* A. + +## Implementation plan (Option A) + +Touchpoints, all in datahike (PSS untouched): + +1. **Config:** add `:fuse-index-roots?` (default false). +2. **`db->stored`:** when enabled, for each flushed index pull its root node + (from `CachedStorage` cache at `pset._address`) and emit a fused ref; record + the root address so it can be excluded from the drain. +3. **`commit!` drain:** filter the fused root addresses out of `pending-writes` + before `k/assoc`-ing the rest. (We have `pset._address` per index.) +4. **`stored->db`:** detect the fused ref and reconstruct the index with the + inlined root node seeded into `_root` (constructor 5th arg) + `_address` + + storage for lazy children. +5. **Serialization:** the inlined root is a `Leaf`/`Branch` — already has + konserve read/write handlers, so it nests in the record map for free. + +## Caveats to resolve + +1. **crypto-hash audit** (`index/persistent_set.cljc` `walk-pss-address!`) + starts at the root *address* via `k/get` — with the root inlined there's no + konserve object there. v1: gate fusion on `:crypto-hash? false`, or teach the + walk to take the root from the record. (The merkle `:address` is still + computable from the inlined node, so audit *can* be made to work.) +2. **GC / `mark`:** the fused root has no konserve object; the reachability/free + path must not expect one at that address (don't add it to the konserve-key + reachable set; its children's addresses still are). +3. **`pending-writes` skip must be exact:** only the per-index *root* address is + removed; every deeper dirty node stays. Identify by `pset._address`. +4. **Backwards compat:** a fused db-record must be distinguishable from a legacy + one on read (presence of `:root`), so old stores still restore. + +## Validation + +- Roundtrip: write → restore → `(= (vec before) (vec after))`, counts, slices, + history (`as-of`) — at `:fuse-index-roots? true` and `false`. +- Measure with the saas `commit-cost` probe: PUTs/commit and cold-open GETs, + baseline vs fused, across tiny (single-leaf) and deeper trees. +- Full datahike test suite green with the flag off (byte-identical) and on. diff --git a/src/datahike/audit.cljc b/src/datahike/audit.cljc index a581cea9f..3ed5c3d43 100644 --- a/src/datahike/audit.cljc +++ b/src/datahike/audit.cljc @@ -18,7 +18,12 @@ [konserve.core :as k] [superv.async #?(:clj :refer :cljs :refer-macros) [go-try- d + (and (contains? #{:memory :mem} backend) (contains? d :diff-buf-size)) + (assoc :diff-buf-size 0)))) + (defn from-deprecated [{:keys [backend username password path host port id] :as _backend-cfg} & {:keys [schema-on-read temporal-index index initial-tx] @@ -96,7 +124,7 @@ #?(:clj (java.util.UUID/nameUUIDFromBytes (.getBytes path "UTF-8")) :cljs (uuid path))))})) :index index - :index-config (di/default-index-config index) + :index-config (default-index-config-for-backend index backend) :keep-history? temporal-index :attribute-refs? *default-attribute-refs?* :initial-tx initial-tx @@ -148,7 +176,8 @@ :crypto-hash? *default-crypto-hash?* :branch *default-db-branch* :writer self-writer - :index-config (di/default-index-config *default-index*)}) + ;; storeless ⇒ inherently in-memory ⇒ diff-buf off (no PUTs to fold) + :index-config (default-index-config-for-backend *default-index* :memory)}) (defn remove-nils "Thanks to https://stackoverflow.com/a/34221816" @@ -211,12 +240,13 @@ :index index :branch *default-db-branch* :crypto-hash? *default-crypto-hash?* + :fuse-index-roots? *default-fuse-index-roots?* :writer self-writer :search-cache-size (int-from-env :datahike-search-cache-size *default-search-cache-size*) :store-cache-size (int-from-env :datahike-store-cache-size *default-store-cache-size*) :index-config (if-let [index-config (map-from-env :datahike-index-config nil)] index-config - (di/default-index-config index))} + (default-index-config-for-backend index (:backend store-config)))} merged-config ((comp remove-nils dt/deep-merge) config config-as-arg) {:keys [schema-flexibility initial-tx store attribute-refs?]} merged-config] ;; konserve now handles store config validation at runtime diff --git a/src/datahike/connector.cljc b/src/datahike/connector.cljc index 58c24ae56..81f98f271 100644 --- a/src/datahike/connector.cljc +++ b/src/datahike/connector.cljc @@ -162,6 +162,29 @@ :stored-config stored-config :diff (diff config stored-config)})))) +;; Settings fixed at database creation — they describe the on-disk format/semantics and +;; cannot be changed by reconnecting (changing them would be meaningless or corrupting). +;; Listed explicitly so any future addition is a deliberate decision. +(def create-time-fixed-keys + #{:keep-history? :attribute-refs? :schema-flexibility :index :crypto-hash? :fuse-index-roots? + ;; :index-config sub-keys (PSS): :branching-factor :diff-buf-size + :index-config}) + +;; Of the fixed keys, the ones whose datahike default has changed (:fuse-index-roots?) or +;; that were newly added (:index-config {:branching-factor :diff-buf-size}) are sourced from +;; the STORED config on connect — adopt the stored value, or drop the key when the store +;; predates it. This lets existing stores connect unchanged and new stores reconnect +;; without re-specifying, while the strict consistency check still guards every other key. +;; (:index is already reconciled with a warning in -connect-impl*.) +(defn adopt-stored-fixed [config stored-config] + (let [adopt (fn [c k] (if (contains? stored-config k) (assoc c k (get stored-config k)) (dissoc c k))) + s-ic (or (:index-config stored-config) {}) + adopt-ic (fn [ic k] (if (contains? s-ic k) (assoc ic k (get s-ic k)) (dissoc ic k))) + config (adopt config :fuse-index-roots?) + config (update config :index-config + (fn [ic] (reduce adopt-ic (or ic {}) [:branching-factor :diff-buf-size])))] + (if (empty? (:index-config config)) (dissoc config :index-config) config))) + (defn- normalize-config [cfg] (-> cfg (dissoc :writer :store :store-cache-size :search-cache-size))) @@ -209,6 +232,10 @@ [config store stored-db])) [config store stored-db])) _ (version-check stored-db) + ;; Source create-time-fixed settings (fuse / bf / diff-buf-size) from the + ;; store so existing stores connect unchanged and new ones reconnect + ;; without re-specifying; flows into both the check and the running db. + config (adopt-stored-fixed config (:config stored-db)) _ (when-not (:allow-unsafe-config config) (ensure-stored-config-consistency config (:config stored-db))) conn (conn-from-db (dsi/stored->db (assoc stored-db :config config) store))] diff --git a/src/datahike/gc.cljc b/src/datahike/gc.cljc index 47d9cc830..dd8930a20 100644 --- a/src/datahike/gc.cljc +++ b/src/datahike/gc.cljc @@ -1,6 +1,6 @@ (ns datahike.gc (:require [clojure.set :as set] - [datahike.index.interface :refer [-mark]] + [datahike.index.interface :refer [-mark -seed-root!]] [datahike.index.secondary :as sec] [konserve.core :as k] [konserve.gc :refer [sweep!]] @@ -25,11 +25,23 @@ (recur r visited reachable) (let [{:keys [eavt-key avet-key aevt-key temporal-eavt-key temporal-avet-key temporal-aevt-key + eavt-root aevt-root avet-root + temporal-eavt-root temporal-aevt-root temporal-avet-root schema-meta-key secondary-index-keys] {:keys [datahike/parents datahike/created-at datahike/updated-at]} :meta} (db) + ;; so walk-addresses uses it and only its children are fetched. + _ (do (when eavt-root (-seed-root! eavt-key eavt-root)) + (when aevt-root (-seed-root! aevt-key aevt-root)) + (when avet-root (-seed-root! avet-key avet-root)) + (when temporal-eavt-root (-seed-root! temporal-eavt-key temporal-eavt-root)) + (when temporal-aevt-root (-seed-root! temporal-aevt-key temporal-aevt-root)) + (when temporal-avet-root (-seed-root! temporal-avet-key temporal-avet-root))) in-range? (> (get-time (or updated-at created-at)) (get-time after-date))] (let [sec-reachable (when (seq secondary-index-keys) diff --git a/src/datahike/index.cljc b/src/datahike/index.cljc index c7c28ede7..854f4540c 100644 --- a/src/datahike/index.cljc +++ b/src/datahike/index.cljc @@ -18,6 +18,8 @@ (def -transient di/-transient) (def -persistent! di/-persistent!) (def -mark di/-mark) +(def -root-node di/-root-node) +(def -seed-root! di/-seed-root!) ;; Aliases for multimethods diff --git a/src/datahike/index/interface.cljc b/src/datahike/index/interface.cljc index 23601c42a..508dce233 100644 --- a/src/datahike/index/interface.cljc +++ b/src/datahike/index/interface.cljc @@ -18,7 +18,9 @@ (-flush [index backend] "Saves the changes to the index to the given konserve backend") (-transient [index] "Returns a transient version of the index") (-persistent! [index] "Returns a persistent version of the index") - (-mark [index] "Return konserve addresses that should be whitelisted for mark and sweep gc.")) + (-mark [index] "Return konserve addresses that should be whitelisted for mark and sweep gc.") + (-root-node [index] "Returns the in-memory root node of a flushed index, for root fusion (inlining the root into the db-record).") + (-seed-root! [index root-node] "Seeds the in-memory root node after restoring a db-record that inlined it (root fusion). Returns the index.")) (defmulti empty-index "Creates an empty index" diff --git a/src/datahike/index/persistent_set.cljc b/src/datahike/index/persistent_set.cljc index b25c56518..468f88b0e 100644 --- a/src/datahike/index/persistent_set.cljc +++ b/src/datahike/index/persistent_set.cljc @@ -2,7 +2,7 @@ (:require [clojure.string] [org.replikativ.persistent-sorted-set :as psset] #?(:cljs [org.replikativ.persistent-sorted-set.btset :refer [BTSet]]) - #?(:cljs [org.replikativ.persistent-sorted-set.branch :refer [Branch]]) + #?(:cljs [org.replikativ.persistent-sorted-set.branch :refer [Branch] :as branch]) #?(:cljs [org.replikativ.persistent-sorted-set.leaf :refer [Leaf]]) #?(:cljs [org.replikativ.persistent-sorted-set.impl.storage :refer [IStorage]]) [org.replikativ.persistent-sorted-set.arrays :as arrays] @@ -23,9 +23,21 @@ #?(:cljs (:require-macros [datahike.index.persistent-set :refer [generate-slice-comparator-constructor]])) #?(:clj (:import [datahike.datom Datom] [org.fressian.handlers WriteHandler ReadHandler] - [org.replikativ.persistent_sorted_set PersistentSortedSet IStorage Leaf Branch ANode Settings] + [org.replikativ.persistent_sorted_set PersistentSortedSet IStorage Leaf Branch ANode Settings Slot] [java.util List]))) +;; diff-buf write-optimization knob (JVM only). A non-zero diff-buf-size makes a commit +;; buffer content-only child diffs into the rewritten ancestor instead of rewriting the +;; whole spine — ~1 PUT/commit for small commits. Primary source is the persisted index +;; config key `:diff-buf-size` (so it round-trips with the store and the consistency check +;; guards it); the `pss.diffBufSize` JVM sysprop is a fallback for ad-hoc experiments only. +;; 0 ⇒ baseline (off) — the default, protecting existing persistent-sorted-set stores. +(defn diff-buf-size ^long [index-config] + (long (or (:diff-buf-size index-config) + ;; JVM-only sysprop fallback for ad-hoc experiments; cljs has no sysprops. + #?(:clj (try (Long/parseLong (System/getProperty "pss.diffBufSize" "0")) (catch Exception _ 0)) + :cljs 0)))) + (def index-type->kwseq {:eavt [:e :a :v :tx :added] :aevt [:a :e :v :tx :added] @@ -212,18 +224,57 @@ (-persistent! [^PersistentSortedSet pset] (persistent! pset)) (-mark [^PersistentSortedSet pset] - (mark pset))) - -(defn- gen-address [^ANode node crypto-hash?] + (mark pset)) + (-root-node [^PersistentSortedSet pset] + ;; In-memory top node; populated after -flush set _root/_address. + #?(:clj (.root pset) + :cljs (.-root pset))) + (-seed-root! [^PersistentSortedSet pset root-node] + ;; Install an inlined (fused) root so root() returns it without a + ;; storage round-trip; deeper children stay lazy via the set's storage. + ;; clj only — root fusion is a JVM feature for now. + #?(:clj (set! (.-_root pset) root-node)) + pset)) + +;; Normalize a value for content hashing: Datoms → vectors (mirrors the leaf hash, and +;; makes the hash independent of the Datom type's identity), maps/seqs recursed. Used so a +;; slot's diff hashes the same whether it's a live PersistentTreeMap (store) or a plain +;; deserialized map (restore) — hasch already canonicalizes map key order. +(defn- canon [x] + (cond + (instance? #?(:clj Datom :cljs dd/Datom) x) + (vec (seq x)) + (map? x) (persistent! (reduce-kv (fn [m k v] (assoc! m (canon k) (canon v))) (transient {}) x)) + (sequential? x) (mapv canon x) + :else x)) + +;; diff-buf crypto address of a Branch. Baseline (no slots) hashes the child addresses — +;; UNCHANGED, so existing crypto stores keep their hashes. With diff-buf the buffered diff +;; lives in the slots (not reflected in the anchor child-addresses), so fold the slots in: +;; the address then reflects the durable representation (anchors + diff) and the audit +;; recomputes the same from the stored node. (Within-store integrity; consistent with the +;; baseline merkle already being shape/representation-dependent.) +(defn- branch-crypto-uuid [node] + (let [slots #?(:clj (.slotsForStorage ^Branch node) :cljs (branch/slots-for-storage node)) + addresses (vec #?(:clj (.addresses ^Branch node) :cljs (.-addresses node)))] + (if slots + (uuid (canon [addresses slots])) + (uuid addresses)))) + +(defn- gen-address [node crypto-hash?] (if crypto-hash? (if (instance? Branch node) - (uuid (vec (.addresses ^Branch node))) - (uuid (mapv (comp vec seq) (.keys node)))) + (branch-crypto-uuid node) ;; folds diff-buf slots on BOTH hosts (cross-host hash parity) + (uuid (mapv (comp vec seq) #?(:clj (.keys ^Leaf node) :cljs (.-keys node))))) (squuid))) ;; Sequential UUID for better index locality -#?(:clj - (defn- walk-pss-address! - "Read the node at `address` directly from konserve, recompute its +(declare walk-pss-address!) + +(defn- node-class-name [node] + #?(:clj (some-> node class .getName) :cljs (some-> node type pr-str))) + +(defn- walk-pss-address! + "Read the node at `address` directly from konserve, recompute its content-addressed UUID, and confirm it matches `address`. Recurses into Branch children, accumulating any anomalies into the `errors` atom (instead of throwing). @@ -242,71 +293,102 @@ Reads go through `k/get store` directly, bypassing the live `CachedStorage` LRU; otherwise a hot in-memory copy could mask a tampered on-disk blob." - [store address verified errors] - (when-not (contains? @verified address) - (let [node (k/get store address nil {:sync? true})] - (cond - (nil? node) - (swap! errors conj {:type :audit/node-missing :address address}) - - :else - (let [recomputed (cond - (instance? Branch node) - (uuid (vec (.addresses ^Branch node))) - (instance? Leaf node) - (uuid (mapv (comp vec seq) (.keys ^Leaf node))))] - (cond - (nil? recomputed) - (swap! errors conj {:type :audit/unknown-node-class - :address address - :node-class (some-> node class .getName)}) - - (not= address recomputed) - (swap! errors conj {:type :audit/merkle-mismatch - :address address - :expected address - :recomputed recomputed - :node-class (some-> node class .getName)}) - - :else - (do - (when (instance? Branch node) - (doseq [child-addr (.addresses ^Branch node)] - (walk-pss-address! store child-addr verified errors))) - (swap! verified conj address))))))))) + [store address verified errors] + (when-not (contains? @verified address) + (let [node (k/get store address nil {:sync? true})] + (cond + (nil? node) + (swap! errors conj {:type :audit/node-missing :address address}) + + :else + (let [recomputed (cond + (instance? Branch node) + (branch-crypto-uuid node) + (instance? Leaf node) + (uuid (mapv (comp vec seq) #?(:clj (.keys ^Leaf node) :cljs (.-keys node)))))] + (cond + (nil? recomputed) + (swap! errors conj {:type :audit/unknown-node-class + :address address + :node-class (node-class-name node)}) + + (not= address recomputed) + (swap! errors conj {:type :audit/merkle-mismatch + :address address + :expected address + :recomputed recomputed + :node-class (node-class-name node)}) + + :else + (do + (when (instance? Branch node) + (doseq [child-addr #?(:clj (.addresses ^Branch node) :cljs (.-addresses node))] + (walk-pss-address! store child-addr verified errors))) + (swap! verified conj address)))))))) + +(defn- walk-pss-node! + "Like walk-pss-address! but for a node already in hand — used for a FUSED root, which + is inlined in the db-record and therefore not a separate konserve object. Recomputes + the node's content UUID, confirms it equals `address`, and recurses into its children + (which ARE separate objects) via walk-pss-address!." + [store node address verified errors] + (when-not (contains? @verified address) + (let [recomputed (cond + (instance? Branch node) (branch-crypto-uuid node) + (instance? Leaf node) (uuid (mapv (comp vec seq) #?(:clj (.keys ^Leaf node) :cljs (.-keys node)))))] + (cond + (nil? recomputed) + (swap! errors conj {:type :audit/unknown-node-class :address address + :node-class (node-class-name node)}) + (not= address recomputed) + (swap! errors conj {:type :audit/merkle-mismatch :address address :expected address + :recomputed recomputed :node-class (node-class-name node)}) + :else + (do (when (instance? Branch node) + (doseq [child-addr #?(:clj (.addresses ^Branch node) :cljs (.-addresses node))] + (walk-pss-address! store child-addr verified errors))) + (swap! verified conj address)))))) (extend-type #?(:clj PersistentSortedSet :cljs BTSet) IAuditable - (-merkle-root [^PersistentSortedSet pset] + (-merkle-root [pset] ;; gen-address (below) makes every node UUID a recursive content - ;; hash of its datoms under :crypto-hash?, so the root _address + ;; hash of its datoms under :crypto-hash?, so the root address ;; captures the whole tree. Set by psset/store during -flush. ;; Returns nil when unflushed; never throws. - (.-_address pset)) - (-recompute-merkle-root [^PersistentSortedSet pset] + #?(:clj (.-_address ^PersistentSortedSet pset) :cljs (.-address ^BTSet pset))) + (-recompute-merkle-root [pset] ;; Walk the tree from konserve, deserialize each node, and confirm ;; its bytes hash back to its address. Konserve does NOT verify ;; content on read, so without this walk a tampered .ksv file would - ;; round-trip undetected — only the in-memory `_address` would still + ;; round-trip undetected — only the in-memory address would still ;; look correct. Returns a result map; never throws on mismatch. - #?(:clj - (let [address (.-_address pset) - storage (.-_storage pset) - store (some-> storage :store)] - (cond - (nil? address) - {:status :unsupported :reason :unflushed} - (nil? store) - {:status :unsupported :reason :no-store} - :else - (let [verified (atom #{}) - errors (atom [])] - (walk-pss-address! store address verified errors) - (if (seq @errors) - {:status :mismatch :root nil :errors @errors} - {:status :ok :root address})))) - :cljs - {:status :unsupported :reason :cljs-not-implemented}))) + ;; Cross-platform: clj reads PersistentSortedSet._address/_storage/.root, + ;; cljs reads the BTSet's address/storage/root fields; the walk/recompute + ;; (branch-crypto-uuid + canon + uuid) is shared so hashes match cross-host. + (let [address #?(:clj (.-_address ^PersistentSortedSet pset) :cljs (.-address ^BTSet pset)) + storage #?(:clj (.-_storage ^PersistentSortedSet pset) :cljs (.-storage ^BTSet pset)) + store (some-> storage :store) + root #?(:clj (.root ^PersistentSortedSet pset) :cljs (.-root ^BTSet pset))] + (cond + (nil? address) + {:status :unsupported :reason :unflushed} + (nil? store) + {:status :unsupported :reason :no-store} + :else + (let [verified (atom #{}) + errors (atom []) + ;; Fused root: inlined in the db-record, not a separate object. Detect by a + ;; direct store read; when absent, verify the seeded in-memory root instead + ;; (recomputing its content hash still detects db-record tampering of the + ;; root), then recurse children (separate objects) as usual. + root-node (k/get store address nil {:sync? true})] + (if (nil? root-node) + (walk-pss-node! store root address verified errors) + (walk-pss-node! store root-node address verified errors)) + (if (seq @errors) + {:status :mismatch :root nil :errors @errors} + {:status :ok :root address})))))) (defn- freelist-pop! "Atomically pop an address from the freelist. Returns nil if empty." @@ -385,14 +467,24 @@ (def ^:const DEFAULT_BRANCHING_FACTOR 512) -(defmethod di/empty-index :datahike.index/persistent-set [_index-name store index-type _] - (let [^PersistentSortedSet pset (psset/sorted-set* {:comparator (index-type->cmp-quick index-type false) +;; Branching factor is create-time-fixed: a tree built at one bf must never be mutated +;; at another (mixed node sizes break the min/max invariants). Sourced from the persisted +;; index-config (default 512 ⇒ existing stores, built at 512, are unaffected). Must reach +;; BOTH fresh-set creation AND the deserialization Settings, else a non-512 store would be +;; mutated at 512 on restore. The consistency check guards against accidental change. +(defn- branching-factor ^long [index-config] + (long (or (:branching-factor index-config) DEFAULT_BRANCHING_FACTOR))) + +(defmethod di/empty-index :datahike.index/persistent-set [_index-name store index-type index-config] + (let [cmp (index-type->cmp-quick index-type false) + ^PersistentSortedSet pset (psset/sorted-set* {:comparator cmp :storage (:storage store) - :branching-factor DEFAULT_BRANCHING_FACTOR})] + :branching-factor (branching-factor index-config) + :diff-buf-size (diff-buf-size index-config)})] (with-meta pset {:index-type index-type}))) -(defmethod di/init-index :datahike.index/persistent-set [_index-name store datoms index-type _ {:keys [indexed]}] +(defmethod di/init-index :datahike.index/persistent-set [_index-name store datoms index-type _ {:keys [indexed] :as index-config}] (let [arr (if (= index-type :avet) (->> datoms (filter #(contains? indexed (.-a ^Datom %))) @@ -401,10 +493,12 @@ (not (arrays/array? datoms)) (arrays/into-array))) _ (arrays/asort arr (index-type->cmp-quick index-type false)) - ^PersistentSortedSet pset (psset/from-sorted-array (index-type->cmp-quick index-type false) + cmp (index-type->cmp-quick index-type false) + ^PersistentSortedSet pset (psset/from-sorted-array cmp arr (arrays/alength arr) - {:branching-factor DEFAULT_BRANCHING_FACTOR})] + {:branching-factor (branching-factor index-config) + :diff-buf-size (diff-buf-size index-config)})] (set! (.-_storage pset) (:storage store)) (with-meta pset {:index-type index-type}))) @@ -412,10 +506,12 @@ ;; temporary import from psset until public (defn- map->settings ^Settings [m] #?(:cljs m + ;; 5-arg normalizing ctor (bf, refType, measure, leaf-processor, diffBufSize): defaults + ;; refType to SOFT when nil. diff-buf: deserialized nodes need diffBufSize>0 to project. :clj (Settings. (int (or (:branching-factor m) 0)) - nil ;; weak ref default - ))) + nil nil nil + (int (or (:diff-buf-size m) 0))))) (defmethod di/add-konserve-handlers :datahike.index/persistent-set [config store] ;; Check if store has pre-configured handlers (e.g., LMDB with buffer encoder). @@ -429,7 +525,8 @@ ;; Standard fressian store - set up serializers ;; deal with circular reference between storage and store - (let [settings (map->settings {:branching-factor DEFAULT_BRANCHING_FACTOR}) + (let [settings (map->settings {:branching-factor (branching-factor (:index-config config)) + :diff-buf-size (diff-buf-size (:index-config config))}) storage (atom nil) store (k/assoc-serializers @@ -446,12 +543,18 @@ ;; The following fields are reset as they cannot be accessed from outside: ;; - 'edit' is set to false, i.e. the set is assumed to be persistent, not transient ;; - 'version' is set back to 0 + ;; diff-buf: the set's per-index comparator (cmp) is propagated to its + ;; Branch nodes (Branch._projCmp) for buffered-leaf projection; the + ;; shared storage carries no comparator. (PersistentSortedSet. meta cmp address @storage nil count settings 0)))) :cljs (fn [reader _tag _component-count] (let [{:keys [meta address count]} (fress/read-object reader) cmp (index-type->cmp-quick (:index-type meta) false)] ;; CLJS BTSet deftype: [root cnt comparator meta _hash storage address settings] + ;; diff-buf: the set's per-index comparator (cmp) is propagated to its + ;; Branch nodes (_projCmp) for buffered-leaf projection; shared storage + ;; carries no comparator. (BTSet. nil count cmp meta nil @storage address settings)))) "datahike.index.PersistentSortedSet.Leaf" #?(:clj @@ -468,13 +571,36 @@ #?(:clj (reify ReadHandler (read [_ reader _tag _component-count] - (let [{:keys [keys level addresses subtree-count]} (.readObject reader)] - (Branch. (int level) (count keys) (into-array Object keys) (into-array Object (seq addresses)) nil (long (or subtree-count -1)) settings)))) + (let [{:keys [keys level addresses subtree-count slots]} (.readObject reader) + addr-vec (vec addresses) + ^Branch b (Branch. (int level) (count keys) (into-array Object keys) (into-array Object (seq addresses)) nil (long (or subtree-count -1)) settings)] + ;; diff-buf: reconstruct per-child buffered diffs (anchor = the child's + ;; durable address). Branch.child projects them on descent. Absent ⇒ baseline. + (when slots + (let [arr (object-array (count keys))] + (doseq [[idx entry] slots] + (aset arr (int idx) + (Slot. (:diff entry) (long (:count entry)) (:measure entry) (nth addr-vec (int idx))))) + (set! (.-_slots b) arr))) + b))) :cljs (fn [reader _tag _component-count] - (let [{:keys [keys level addresses subtree-count]} (fress/read-object reader)] - ;; CLJS Branch deftype: [level keys children addresses subtree-count _measure settings] - (Branch. (int level) (clj->js keys) nil (clj->js addresses) (or subtree-count -1) nil settings)))) + (let [{:keys [keys level addresses subtree-count slots]} (fress/read-object reader) + addr-arr (clj->js addresses) + ;; CLJS Branch deftype: [level keys children addresses subtree-count _measure settings _slots _rebalanced] + b (Branch. (int level) (clj->js keys) nil addr-arr (or subtree-count -1) nil settings nil false)] + ;; diff-buf: reconstruct per-child buffered diffs (anchor = the child's + ;; durable address). Branch.child projects them on descent. Absent ⇒ baseline. + (when slots + (let [arr (make-array (count keys))] + (doseq [[idx entry] slots] + (aset arr (int idx) + {:diff (:diff entry) + :count (long (:count entry)) + :measure (:measure entry) + :anchor (aget addr-arr (int idx))})) + (set! (.-_slots b) arr))) + b))) "datahike.datom.Datom" #?(:clj (reify ReadHandler @@ -513,10 +639,14 @@ (reify WriteHandler (write [_ writer node] (.writeTag writer "datahike.index.PersistentSortedSet.Branch" 1) - (.writeObject writer {:level (.level ^Branch node) - :keys (.keys ^Branch node) - :addresses (.addresses ^Branch node) - :subtree-count (.subtreeCount ^Branch node)})))} + ;; diff-buf: emit :slots only when present (nil ⇒ byte-identical to + ;; the pre-diff-buf format, so diffBufSize=0 / legacy DBs are unaffected). + (let [slots (.slotsForStorage ^Branch node)] + (.writeObject writer (cond-> {:level (.level ^Branch node) + :keys (.keys ^Branch node) + :addresses (.addresses ^Branch node) + :subtree-count (.subtreeCount ^Branch node)} + slots (assoc :slots slots))))))} datahike.datom.Datom {"datahike.datom.Datom" @@ -545,10 +675,14 @@ Branch (fn [writer node] (fress/write-tag writer "datahike.index.PersistentSortedSet.Branch" 1) - (fress/write-object writer {:level (.-level ^Branch node) - :keys (vec (.-keys ^Branch node)) - :addresses (vec (.-addresses ^Branch node)) - :subtree-count (.-subtree-count ^Branch node)})) + ;; diff-buf: emit :slots only when present (nil ⇒ byte-identical to + ;; the pre-diff-buf format, so diff-buf-size=0 / legacy DBs are unaffected). + (let [slots (branch/slots-for-storage ^Branch node)] + (fress/write-object writer (cond-> {:level (.-level ^Branch node) + :keys (vec (.-keys ^Branch node)) + :addresses (vec (.-addresses ^Branch node)) + :subtree-count (.-subtree-count ^Branch node)} + slots (assoc :slots slots))))) datahike.datom.Datom (fn [writer datom] @@ -562,4 +696,9 @@ store) (defmethod di/default-index-config :datahike.index/persistent-set [_index-name] - {}) + ;; diff-buf: diff-buffering ON by default for NEW stores (budget 256) — ~1 PUT/commit + ;; for small commits on object stores. Baked into the stored config at create time, so + ;; existing stores keep their own value (adopt-stored-fixed sources it from the store, and + ;; `diff-buf-size` defaults to 0 when absent ⇒ pre-diff-buf stores stay baseline). Set + ;; {:diff-buf-size 0} explicitly to disable. + {:diff-buf-size 256}) diff --git a/src/datahike/writing.cljc b/src/datahike/writing.cljc index 988a0fd63..d6cab102b 100644 --- a/src/datahike/writing.cljc +++ b/src/datahike/writing.cljc @@ -130,7 +130,21 @@ :temporal-aevt-key (safe-root temporal-aevt') :temporal-avet-key (safe-root temporal-avet')) sec-roots - (assoc :secondary sec-roots))] + (assoc :secondary sec-roots)) + ;; Root fusion: inline each flushed index's root node into the + ;; db-record. `commit!` then skips writing those root nodes as + ;; separate objects (see fused-root-addresses). Works under crypto-hash?: + ;; the root's address is still its content hash, and the audit walk + ;; verifies the inlined root (walk-pss-node!) + recurses children. + fuse? (and flush! (:fuse-index-roots? config)) + fused-roots (when fuse? + (cond-> {:eavt-root (di/-root-node eavt') + :aevt-root (di/-root-node aevt') + :avet-root (di/-root-node avet')} + (:keep-history? config) + (assoc :temporal-eavt-root (di/-root-node temporal-eavt') + :temporal-aevt-root (di/-root-node temporal-aevt') + :temporal-avet-root (di/-root-node temporal-avet'))))] [schema-meta-kv-to-write (merge {:schema-meta-key schema-meta-key @@ -149,7 +163,8 @@ :temporal-aevt-key temporal-aevt' :temporal-avet-key temporal-avet'}) (when secondary-index-keys - {:secondary-index-keys secondary-index-keys}))]))) + {:secondary-index-keys secondary-index-keys}) + fused-roots)]))) (defn- restore-secondary-indices "Restore secondary index instances from stored key-maps. @@ -200,10 +215,22 @@ [stored-db store] (let [{:keys [eavt-key aevt-key avet-key temporal-eavt-key temporal-aevt-key temporal-avet-key + eavt-root aevt-root avet-root + temporal-eavt-root temporal-aevt-root temporal-avet-root secondary-index-keys schema rschema system-entities ref-ident-map ident-ref-map config max-tx max-eid op-count hash meta schema-meta-key] :or {op-count 0}} stored-db + ;; Root fusion: if the record inlined index roots, seed them into the + ;; restored indexes so root() returns them with no storage round-trip + ;; (deeper children stay lazy). Presence-based, so fused and legacy + ;; records both restore — no reader config needed. + _ (do (when eavt-root (di/-seed-root! eavt-key eavt-root)) + (when aevt-root (di/-seed-root! aevt-key aevt-root)) + (when avet-root (di/-seed-root! avet-key avet-root)) + (when temporal-eavt-root (di/-seed-root! temporal-eavt-key temporal-eavt-root)) + (when temporal-aevt-root (di/-seed-root! temporal-aevt-key temporal-aevt-root)) + (when temporal-avet-root (di/-seed-root! temporal-avet-key temporal-avet-root))) schema-meta (or (sc/cache-lookup schema-meta-key) ;; not in store in case we load an old db where the schema meta data was inline (when-let [schema-meta (k/get store schema-meta-key nil {:sync? true})] @@ -287,6 +314,21 @@ content-uuid (squuid content-uuid))))) +(defn- fused-root-addresses + "When root fusion is enabled, the addresses of the index root nodes that + `db->stored` inlined into the record. These must be excluded from the + pending-writes drain so they are not also written as separate objects. + Under root fusion (non-crypto-hash) `:merkle-roots` holds each index's + root `_address`, which is exactly its pending-writes key." + [config db-to-store] + (when (:fuse-index-roots? config) + (->> (select-keys (:merkle-roots db-to-store) + [:eavt-key :aevt-key :avet-key + :temporal-eavt-key :temporal-aevt-key :temporal-avet-key]) + vals + (remove nil?) + set))) + (defn write-pending-kvs! "Writes a collection of key-value pairs to the store. Handles synchronous and asynchronous writes. @@ -318,7 +360,12 @@ db (assoc-in db [:meta :datahike/commit-id] cid) db-to-store (assoc-in db-to-store-pre [:meta :datahike/commit-id] cid) - pending-kvs (get-and-clear-pending-kvs! store)] + ;; Root fusion: roots are inlined in db-to-store, so drop + ;; them from the separate-object writes. + fused-addrs (fused-root-addresses config db-to-store) + pending-kvs (cond->> (get-and-clear-pending-kvs! store) + (seq fused-addrs) + (remove (fn [[k _]] (contains? fused-addrs k))))] (if (multi-key-capable? store) (let [[meta-key meta-val] schema-meta-kv-to-write diff --git a/test/datahike/test/config_test.cljc b/test/datahike/test/config_test.cljc index dd2b78e16..ce760eee0 100644 --- a/test/datahike/test/config_test.cljc +++ b/test/datahike/test/config_test.cljc @@ -38,18 +38,21 @@ :keep-history? true :initial-tx nil :index :datahike.index/persistent-set - :index-config {} :schema-flexibility :write :crypto-hash? false :branch :db :writer c/self-writer :search-cache-size c/*default-search-cache-size* :store-cache-size c/*default-store-cache-size*}] + ;; diff-buf defaults backend-aware: 0 in-memory (no PUTs to fold ⇒ pure overhead), + ;; on (256) for durable object stores like :file. (is (= (merge default-new-cfg - {:store {:backend :memory :id #uuid "ec3537bd-3f0d-3719-acd5-40751bbb1012"}}) + {:index-config {:diff-buf-size 0} + :store {:backend :memory :id #uuid "ec3537bd-3f0d-3719-acd5-40751bbb1012"}}) (c/from-deprecated mem-cfg))) (is (= (merge default-new-cfg - {:store {:backend :file + {:index-config {:diff-buf-size 256} + :store {:backend :file :path "/deprecated/test" :id #uuid "908d33ed-b562-3301-9a9f-94b961e56f05"}}) (c/from-deprecated file-cfg))))) @@ -63,12 +66,14 @@ :schema-flexibility c/*default-schema-flexibility* :index c/*default-index* :crypto-hash? c/*default-crypto-hash?* + :fuse-index-roots? c/*default-fuse-index-roots?* :branch c/*default-db-branch* :writer c/self-writer :search-cache-size c/*default-search-cache-size* :store-cache-size c/*default-store-cache-size*} + ;; default store is :memory ⇒ diff-buf defaults off (backend-aware) (when (seq (di/default-index-config c/*default-index*)) - {:index-config (di/default-index-config c/*default-index*)})) + {:index-config (c/default-index-config-for-backend c/*default-index* :memory)})) (update config :store dissoc :id :scope)))))) (deftest core-config-test diff --git a/test/datahike/test/diff_buf_generative.clj b/test/datahike/test/diff_buf_generative.clj new file mode 100644 index 000000000..1ab6c829f --- /dev/null +++ b/test/datahike/test/diff_buf_generative.clj @@ -0,0 +1,77 @@ +(ns datahike.test.diff-buf-generative + "Seeded end-to-end generative model test for diff-buf write-buffering. Random + transact / value-upsert / retractEntity against a Clojure model, with a release+ + reconnect each cycle (forces a fressian reload from the store). This exercises the + full stack together — PSS diff-buf + the fressian :slots handlers + commit-log + HEAD + + crypto-hash — which the PSS-level harness (edn storage) can't reach. + + Deterministic via (java.util.Random seed): a failure reproduces from (seed, params). + Swept over {diff-buf 0/256} × {crypto-hash off/on}. The bounded `diff-buf-generative` + deftest runs in the suite; `run` drives bigger on-demand sweeps." + (:require [datahike.api :as d] + [clojure.test :refer [deftest is]]) + (:import [java.util Random])) + +(def schema + [{:db/ident :id :db/valueType :db.type/long :db/cardinality :db.cardinality/one :db/unique :db.unique/identity} + {:db/ident :a :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/ident :b :db/valueType :db.type/string :db/cardinality :db.cardinality/one}]) + +(defn run-trial + "One deterministic trial. Returns nil on success, or a failure map (never throws on a + content mismatch — only real errors propagate)." + [seed {:keys [idrange cycles ops crypto? diff-buf]}] + (let [rng (Random. seed) + path (str (System/getProperty "java.io.tmpdir") "/dh-diffbuf-gen-" seed "-" diff-buf "-" (if crypto? "c" "p")) + cfg {:store {:backend :file :path path :id (java.util.UUID/randomUUID)} + :schema-flexibility :write :keep-history? false + :crypto-hash? (boolean crypto?) + :index-config {:diff-buf-size diff-buf :branching-factor 16}} + model (atom {})] ; id -> {:a long :b string} + (d/delete-database cfg) + (d/create-database cfg) + (let [conn (atom (d/connect cfg)) fail (atom nil)] + (try + (d/transact @conn schema) + (dotimes [c cycles] + (when-not @fail + (dotimes [_ ops] + (let [r (.nextInt rng 3) id (long (.nextInt rng (int idrange)))] + (cond + (= r 0) (let [a (long (.nextInt rng 1000)) b (str (.nextInt rng 1000))] + (swap! model assoc id {:a a :b b}) + (d/transact @conn [{:id id :a a :b b}])) + (and (= r 1) (contains? @model id)) ; value-upsert (change :a only) + (let [a (long (.nextInt rng 1000))] + (swap! model update id assoc :a a) + (d/transact @conn [{:id id :a a}])) + (and (= r 2) (contains? @model id)) ; retract whole entity + (do (swap! model dissoc id) + (d/transact @conn [[:db/retractEntity [:id id]]]))))) + ;; reopen — forces a cold fressian reload from the store + (d/release @conn) + (reset! conn (d/connect cfg)) + (let [db @@conn + got (into {} (map (fn [[id a b]] [id {:a a :b b}])) + (d/q '[:find ?id ?a ?b :where [?e :id ?id] [?e :a ?a] [?e :b ?b]] db))] + (when (not= @model got) + (reset! fail {:seed seed :cycle c :params {:crypto? crypto? :diff-buf diff-buf} + :model-n (count @model) :got-n (count got)}))))) + @fail + (finally + (try (d/release @conn) (catch Throwable _)) + (try (d/delete-database cfg) (catch Throwable _))))))) + +(defn run + "Sweep grid × seeds; returns the seq of failures (empty = all good)." + [grid seeds] + (->> (for [params grid seed (range seeds)] (run-trial seed params)) + (remove nil?) + vec)) + +(deftest diff-buf-generative + (let [grid (for [crypto? [false true] diff-buf [0 256]] + {:idrange 250 :cycles 6 :ops 35 :crypto? crypto? :diff-buf diff-buf}) + fails (run grid 3)] + (is (empty? fails) + (str (count fails) " generative trial(s) diverged from model: " (pr-str (vec (take 6 fails))))))) diff --git a/test/datahike/test/nodejs_test.cljs b/test/datahike/test/nodejs_test.cljs index f44f79bde..8b527635c 100644 --- a/test/datahike/test/nodejs_test.cljs +++ b/test/datahike/test/nodejs_test.cljs @@ -1,9 +1,12 @@ (ns datahike.test.nodejs-test (:require [cljs.test :refer [deftest is async] :as t] + [cljs.reader] [datahike.api :as d] + [datahike.index.audit :as ia] + [datahike.audit :as audit] [datahike.online-gc :as online-gc] [konserve.core :as k] - [konserve.node-filestore] ;; Register :file backend for Node.js + [konserve.node-filestore :as nfs] ;; Register :file backend for Node.js [cljs.core.async :refer [go > (d/datoms db :eavt) + (map (fn [d] [(:e d) (name (:a d)) (str (:v d))])) + (sort) + (vec)) + got-n-count (d/q '[:find (count ?e) . :where [?e :n _]] db) + got-n-sum (reduce + (map :v (filter #(= :n (:a %)) (d/datoms db :eavt))))] + (is (= datom-count (count got-datoms)) + (str "cljs read same datom count (jvm=" datom-count " cljs=" (count got-datoms) ")")) + (is (= n-count got-n-count) + (str ":n entity count matches (jvm=" n-count " cljs=" got-n-count ")")) + (is (= n-sum got-n-sum) + (str ":n value sum matches (projection-sound) (jvm=" n-sum " cljs=" got-n-sum ")")) + (is (= datoms got-datoms) + "cljs eavt datoms identical to JVM (full buffered-leaf projection)") + (d/release conn))) + (catch js/Error e + (is false (str "jvm-opbuf-exchange-test error: " (.-message e)))) + (finally + (done)))))) + +;; diff-buf phase-2 gate: cljs WRITE path. Same-host (create+transact+query all in cljs, +;; avoiding the pre-existing cross-host connect bug). Incremental commits make leaves +;; content-only dirty → buffered leaf slots in the root → on cold reopen they project back. +;; Writes to a FIXED dir (not deleted) so buffering can be confirmed externally (grep slots). +(def ^:private cljs-opbuf-dir "/tmp/dh-cljs-opbuf") + +(deftest cljs-opbuf-write-roundtrip-test + (let [sid #uuid "00000000-0000-0000-0000-00000000c1c5" + cfg {:store {:backend :file :path cljs-opbuf-dir :id sid} + :schema-flexibility :write :keep-history? false + :index :datahike.index/persistent-set + :index-config {:diff-buf-size 256}}] + (async done + (go + (try + (when (bf entities so the index has BRANCH nodes (diff-buf only engages on + ;; branches; a sub-512 tree is a single leaf and never buffers). + (loop [bs (partition-all 200 (range 2000))] + (when (seq bs) + (= round 40) + (do (d/release conn) + (let [c (d/connect cfg)] + (is (= @present (idset c)) (str "final ref=" (count @present) " got=" (count (idset c)))) + (d/release c))) + (let [insert? (even? (rnd 2)) + cand (vec (distinct (repeatedly 40 #(rnd 4000)))) + ops (if insert? (vec (remove @present cand)) (vec (filter @present cand)))] + (when (seq ops) + (if insert? + (do (