5555import java .util .Objects ;
5656import java .util .PriorityQueue ;
5757import java .util .Queue ;
58- import java .util .Random ;
5958import java .util .Set ;
59+ import java .util .SplittableRandom ;
6060import java .util .concurrent .CompletableFuture ;
6161import java .util .concurrent .Executor ;
6262import java .util .concurrent .atomic .AtomicReference ;
@@ -89,8 +89,6 @@ public class HNSW {
8989 @ Nonnull
9090 private static final Logger logger = LoggerFactory .getLogger (HNSW .class );
9191
92- @ Nonnull
93- private final Random random ;
9492 @ Nonnull
9593 private final Subspace subspace ;
9694 @ Nonnull
@@ -141,7 +139,6 @@ public HNSW(@Nonnull final Subspace subspace,
141139 @ Nonnull final Config config ,
142140 @ Nonnull final OnWriteListener onWriteListener ,
143141 @ Nonnull final OnReadListener onReadListener ) {
144- this .random = new Random (config .getRandomSeed ());
145142 this .subspace = subspace ;
146143 this .executor = executor ;
147144 this .config = config ;
@@ -581,7 +578,7 @@ private Quantizer quantizer(@Nullable final AccessInfo accessInfo) {
581578 return onReadListener .onAsyncRead (
582579 storageAdapter .fetchNode (readTransaction , storageTransform , layer ,
583580 nodeReference .getPrimaryKey ()))
584- .thenApply (node -> biMapFunction .apply (nodeReference , node ));
581+ .thenApply (node -> biMapFunction .apply (nodeReference , Objects . requireNonNull ( node ) ));
585582 }
586583
587584 /**
@@ -748,19 +745,35 @@ private Quantizer quantizer(@Nullable final AccessInfo accessInfo) {
748745 @ Nonnull
749746 public CompletableFuture <Void > insert (@ Nonnull final Transaction transaction , @ Nonnull final Tuple newPrimaryKey ,
750747 @ Nonnull final RealVector newVector ) {
751- final int insertionLayer = insertionLayer ();
748+ final SplittableRandom random = random (newPrimaryKey );
749+ final int insertionLayer = insertionLayer (random );
752750 if (logger .isTraceEnabled ()) {
753751 logger .trace ("new node with key={} selected to be inserted into layer={}" , newPrimaryKey , insertionLayer );
754752 }
755753
756754 return StorageAdapter .fetchAccessInfo (getConfig (), transaction , getSubspace (), getOnReadListener ())
757- .thenCompose (accessInfo -> {
758- final AccessInfo currentAccessInfo ;
755+ .thenCombine (exists (transaction , newPrimaryKey ),
756+ (accessInfo , nodeAlreadyExists ) -> {
757+ if (nodeAlreadyExists ) {
758+ if (logger .isDebugEnabled ()) {
759+ logger .debug ("new record already exists in HNSW with key={} on layer={}" ,
760+ newPrimaryKey , insertionLayer );
761+ }
762+ }
763+ return new AccessInfoAndNodeExistence (accessInfo , nodeAlreadyExists );
764+ })
765+ .thenCompose (accessInfoAndNodeExistence -> {
766+ if (accessInfoAndNodeExistence .isNodeExists ()) {
767+ return AsyncUtil .DONE ;
768+ }
769+
770+ final AccessInfo accessInfo = accessInfoAndNodeExistence .getAccessInfo ();
759771 final AffineOperator storageTransform = storageTransform (accessInfo );
760772 final Transformed <RealVector > transformedNewVector = storageTransform .transform (newVector );
761773 final Quantizer quantizer = quantizer (accessInfo );
762774 final Estimator estimator = quantizer .estimator ();
763775
776+ final AccessInfo currentAccessInfo ;
764777 if (accessInfo == null ) {
765778 // this is the first node
766779 writeLonelyNodes (quantizer , transaction , newPrimaryKey , transformedNewVector ,
@@ -817,10 +830,24 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
817830 insertIntoLayers (transaction , storageTransform , quantizer , newPrimaryKey ,
818831 transformedNewVector , nodeReference , lMax , insertionLayer ))
819832 .thenCompose (ignored ->
820- addToStatsIfNecessary (transaction , currentAccessInfo , transformedNewVector ));
833+ addToStatsIfNecessary (random . split (), transaction , currentAccessInfo , transformedNewVector ));
821834 }).thenCompose (ignored -> AsyncUtil .DONE );
822835 }
823836
837+ @ Nonnull
838+ @ VisibleForTesting
839+ CompletableFuture <Boolean > exists (@ Nonnull final ReadTransaction readTransaction ,
840+ @ Nonnull final Tuple primaryKey ) {
841+ final StorageAdapter <? extends NodeReference > storageAdapter = getStorageAdapterForLayer (0 );
842+
843+ //
844+ // Call fetchNode() to check for the node's existence; we are handing in the identity operator, since we don't
845+ // care about the vector itself at all.
846+ //
847+ return storageAdapter .fetchNode (readTransaction , AffineOperator .identity (), 0 , primaryKey )
848+ .thenApply (Objects ::nonNull );
849+ }
850+
824851 /**
825852 * Method to keep stats if necessary. Stats need to be kept and maintained when the client would like to use
826853 * e.g. RaBitQ as RaBitQ needs a stable somewhat correct centroid in order to function properly.
@@ -832,21 +859,23 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
832859 * in order to finally compute the centroid if {@link Config#getStatsThreshold()} number of vectors have been
833860 * sampled and aggregated. That centroid is then used to update the access info.
834861 *
862+ * @param random a random to use
835863 * @param transaction the transaction
836864 * @param currentAccessInfo this current access info that was fetched as part of an insert
837865 * @param transformedNewVector the new vector (in the transformed coordinate system) that may be added
838866 * @return a future that returns {@code null} when completed
839867 */
840868 @ Nonnull
841- private CompletableFuture <Void > addToStatsIfNecessary (@ Nonnull final Transaction transaction ,
869+ private CompletableFuture <Void > addToStatsIfNecessary (@ Nonnull final SplittableRandom random ,
870+ @ Nonnull final Transaction transaction ,
842871 @ Nonnull final AccessInfo currentAccessInfo ,
843872 @ Nonnull final Transformed <RealVector > transformedNewVector ) {
844873 if (getConfig ().isUseRaBitQ () && !currentAccessInfo .canUseRaBitQ ()) {
845- if (shouldSampleVector ()) {
874+ if (shouldSampleVector (random )) {
846875 StorageAdapter .appendSampledVector (transaction , getSubspace (),
847876 1 , transformedNewVector , onWriteListener );
848877 }
849- if (shouldMaintainStats ()) {
878+ if (shouldMaintainStats (random )) {
850879 return StorageAdapter .consumeSampledVectors (transaction , getSubspace (),
851880 50 , onReadListener )
852881 .thenApply (sampledVectors -> {
@@ -1512,6 +1541,15 @@ private StorageAdapter<? extends NodeReference> getStorageAdapterForLayer(final
15121541 getOnReadListener ());
15131542 }
15141543
1544+ @ Nonnull
1545+ private SplittableRandom random (@ Nonnull final Tuple primaryKey ) {
1546+ if (config .isDeterministicSeeding ()) {
1547+ return new SplittableRandom (primaryKey .hashCode ());
1548+ } else {
1549+ return new SplittableRandom (System .nanoTime ());
1550+ }
1551+ }
1552+
15151553 /**
15161554 * Calculates a random layer for a new element to be inserted.
15171555 * <p>
@@ -1521,20 +1559,20 @@ private StorageAdapter<? extends NodeReference> getStorageAdapterForLayer(final
15211559 * is {@code floor(-ln(u) * lambda)}, where {@code u} is a uniform random
15221560 * number and {@code lambda} is a normalization factor derived from a system
15231561 * configuration parameter {@code M}.
1524- *
1562+ * @param random a random to use
15251563 * @return a non-negative integer representing the randomly selected layer.
15261564 */
1527- private int insertionLayer () {
1565+ private int insertionLayer (@ Nonnull final SplittableRandom random ) {
15281566 double lambda = 1.0 / Math .log (getConfig ().getM ());
15291567 double u = 1.0 - random .nextDouble (); // Avoid log(0)
15301568 return (int ) Math .floor (-Math .log (u ) * lambda );
15311569 }
15321570
1533- private boolean shouldSampleVector () {
1571+ private boolean shouldSampleVector (@ Nonnull final SplittableRandom random ) {
15341572 return random .nextDouble () < getConfig ().getSampleVectorStatsProbability ();
15351573 }
15361574
1537- private boolean shouldMaintainStats () {
1575+ private boolean shouldMaintainStats (@ Nonnull final SplittableRandom random ) {
15381576 return random .nextDouble () < getConfig ().getMaintainStatsProbability ();
15391577 }
15401578
@@ -1546,4 +1584,24 @@ private static <T> List<T> drain(@Nonnull Queue<T> queue) {
15461584 }
15471585 return resultBuilder .build ();
15481586 }
1587+
1588+ private static class AccessInfoAndNodeExistence {
1589+ @ Nullable
1590+ private final AccessInfo accessInfo ;
1591+ private final boolean nodeExists ;
1592+
1593+ public AccessInfoAndNodeExistence (@ Nullable final AccessInfo accessInfo , final boolean nodeExists ) {
1594+ this .accessInfo = accessInfo ;
1595+ this .nodeExists = nodeExists ;
1596+ }
1597+
1598+ @ Nullable
1599+ public AccessInfo getAccessInfo () {
1600+ return accessInfo ;
1601+ }
1602+
1603+ public boolean isNodeExists () {
1604+ return nodeExists ;
1605+ }
1606+ }
15491607}
0 commit comments