Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9efdcc6
feat: add balances-watcher interfaces
limitofzero Jun 5, 2026
53e049f
fix(balances): terminate SSE on malformed balance_update
limitofzero Jun 6, 2026
f98d907
Merge branch 'develop' into feat/add-balances-watcher-interfaces
limitofzero Jun 6, 2026
ebeb8d0
Merge branch 'develop' into feat/add-balances-watcher-interfaces
limitofzero Jun 8, 2026
08eb84c
Merge branch 'develop' into feat/add-balances-watcher-interfaces
limitofzero Jun 9, 2026
d9c3c1a
chore: update balances-watcher default URL to staging endpoint
limitofzero Jun 10, 2026
b914309
feat: add bw updater
limitofzero Jun 10, 2026
ef2de4a
fix: address review comments
limitofzero Jun 10, 2026
171c27c
Merge branch 'feat/add-balances-watcher-interfaces' into feat/balance…
limitofzero Jun 10, 2026
12aec76
chore: comments
limitofzero Jun 10, 2026
e9785cf
refactor: replace epoch by cancel flag
limitofzero Jun 10, 2026
10344db
Merge branch 'develop' into feat/balance-watcher-updater-2
limitofzero Jun 16, 2026
ffc4819
feat: remove custom tokens comparator
limitofzero Jun 16, 2026
f339215
refactor: useMemo
limitofzero Jun 16, 2026
1febce9
feat: add eth tracking
limitofzero Jun 16, 2026
ac9903d
fix: change ff name
limitofzero Jun 16, 2026
73520a7
Merge branch 'develop' into feat/balance-watcher-updater-2
limitofzero Jun 16, 2026
2a532c6
fix: skip non-evm networks
limitofzero Jun 16, 2026
f1d0cd4
fix: remove toLowerCase
limitofzero Jun 16, 2026
ab8147b
Merge branch 'feat/balance-watcher-updater-2' of github.com:cowprotoc…
limitofzero Jun 16, 2026
67445bd
Merge branch 'develop' into feat/balance-watcher-updater-2
limitofzero Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import { ReactNode, useEffect, useMemo, useState } from 'react'

import {
BalancesAndAllowancesUpdater,
BalancesWatcherUpdater,
PRIORITY_TOKENS_REFRESH_INTERVAL,
PriorityTokensUpdater,
} from '@cowprotocol/balances-and-allowances'
import { useFeatureFlags } from '@cowprotocol/common-hooks'
import { isNonEvmChain } from '@cowprotocol/cow-sdk'
import { useWalletInfo } from '@cowprotocol/wallet'

import { useBalancesContext } from 'entities/balancesContext/useBalancesContext'
Expand All @@ -20,6 +23,8 @@ export function CommonPriorityBalancesAndAllowancesUpdater(): ReactNode {
const balancesContext = useBalancesContext()
const balancesAccount = balancesContext.account || account

const { isBwEnabled } = useFeatureFlags()

const priorityTokenAddresses = usePriorityTokenAddresses()
const priorityTokenAddressesAsArray = useMemo(() => {
return Array.from(priorityTokenAddresses.values())
Expand Down Expand Up @@ -52,6 +57,10 @@ export function CommonPriorityBalancesAndAllowancesUpdater(): ReactNode {

const refreshTrigger = useOrdersFilledEventsTrigger()

if (isBwEnabled && !isNonEvmChain(sourceChainId)) {
return <BalancesWatcherUpdater account={balancesAccount} chainId={sourceChainId} />
}

return (
<>
<PriorityTokensUpdater
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
import { Provider, useAtomValue } from 'jotai'
import { useHydrateAtoms } from 'jotai/utils'
import React, { ReactNode } from 'react'

import { NATIVE_CURRENCY_ADDRESS } from '@cowprotocol/common-const'
import { getAddressKey, SupportedChainId } from '@cowprotocol/cow-sdk'

import { act, renderHook } from '@testing-library/react'

import { useBalancesWatcherSession, UseBalancesWatcherSessionParams } from './useBalancesWatcherSession'

import { BalancesSubscription, BalancesWatcherApiError, SubscribeToBalancesEventsParams } from '../balancesWatcher'
import { balancesAtom, BalancesState, DEFAULT_BALANCES_STATE } from '../state/balancesAtom'

jest.mock('../balancesWatcher', () => {
const actual = jest.requireActual('../balancesWatcher')
return {
...actual,
createBalancesWatcherSession: jest.fn(),
subscribeToBalancesEvents: jest.fn(),
}
})

const balancesWatcherModule = jest.requireMock('../balancesWatcher') as {
createBalancesWatcherSession: jest.Mock
subscribeToBalancesEvents: jest.Mock
}
const mockCreateSession = balancesWatcherModule.createBalancesWatcherSession
const mockSubscribe = balancesWatcherModule.subscribeToBalancesEvents

interface Deferred<T> {
promise: Promise<T>
resolve: (value: T) => void
reject: (reason?: unknown) => void
}

function deferred<T>(): Deferred<T> {
let resolve!: (value: T) => void
let reject!: (reason?: unknown) => void
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}

const ACCOUNT = '0x1234567890123456789012345678901234567890'
const TOKEN_A = '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'
const TOKEN_B = '0xdAC17F958D2ee523a2206206994597C13D831ec7'

function makeParams(overrides: Partial<UseBalancesWatcherSessionParams> = {}): UseBalancesWatcherSessionParams {
return {
account: ACCOUNT,
chainId: SupportedChainId.MAINNET,
tokensListsUrls: ['https://example.com/tokens.json'],
customTokens: [],
...overrides,
}
}

let currentInitialBalances: BalancesState = DEFAULT_BALANCES_STATE

function HydrateAtoms({ children }: { children: ReactNode }): ReactNode {
useHydrateAtoms([[balancesAtom, currentInitialBalances]])
return <>{children}</>
}

function Wrapper({ children }: { children: ReactNode }): ReactNode {
return (
<Provider>
<HydrateAtoms>{children}</HydrateAtoms>
</Provider>
)
}

function renderSession(
initialParams: UseBalancesWatcherSessionParams = makeParams(),
initialBalances: BalancesState = DEFAULT_BALANCES_STATE,
): ReturnType<typeof renderHook<BalancesState, { params: UseBalancesWatcherSessionParams }>> {
currentInitialBalances = initialBalances
return renderHook(
({ params }: { params: UseBalancesWatcherSessionParams }) => {
useBalancesWatcherSession(params)
return useAtomValue(balancesAtom)
},
{ wrapper: Wrapper, initialProps: { params: initialParams } },
)
}

function capturedSubscribeParams(): SubscribeToBalancesEventsParams {
const calls = mockSubscribe.mock.calls
expect(calls.length).toBeGreaterThan(0)
return calls[calls.length - 1][0] as SubscribeToBalancesEventsParams
}

describe('useBalancesWatcherSession', () => {
beforeEach(() => {
jest.clearAllMocks()
mockCreateSession.mockReturnValue(Promise.resolve())
mockSubscribe.mockReturnValue({ close: jest.fn() } satisfies BalancesSubscription)
})

it('does not create a session when account is undefined', () => {
renderSession(makeParams({ account: undefined }))

expect(mockCreateSession).not.toHaveBeenCalled()
expect(mockSubscribe).not.toHaveBeenCalled()
})

it('does not create a session when both lists and customTokens are empty', () => {
renderSession(makeParams({ tokensListsUrls: [], customTokens: [] }))

expect(mockCreateSession).not.toHaveBeenCalled()
})

it('does not create a session for a non-EVM chain (Solana)', () => {
renderSession(makeParams({ chainId: SupportedChainId.SOLANA }))

expect(mockCreateSession).not.toHaveBeenCalled()
})

it('creates a session with the expected body and subscribes after it resolves', async () => {
const session = deferred<void>()
mockCreateSession.mockReturnValueOnce(session.promise)

renderSession(makeParams({ customTokens: [getAddressKey(TOKEN_A)] }))

expect(mockCreateSession).toHaveBeenCalledTimes(1)
expect(mockCreateSession).toHaveBeenCalledWith({
chainId: SupportedChainId.MAINNET,
owner: ACCOUNT,
body: {
tokensListsUrls: ['https://example.com/tokens.json'],
customTokens: [getAddressKey(TOKEN_A)],
},
})
expect(mockSubscribe).not.toHaveBeenCalled()

await act(async () => {
session.resolve()
})

expect(mockSubscribe).toHaveBeenCalledTimes(1)
expect(capturedSubscribeParams()).toMatchObject({
chainId: SupportedChainId.MAINNET,
owner: ACCOUNT,
})
})

it('writes the snapshot into balancesAtom (bigint values, normalized address keys, first-load flags)', async () => {
const session = deferred<void>()
mockCreateSession.mockReturnValueOnce(session.promise)

const { result } = renderSession()

await act(async () => {
session.resolve()
})

await act(async () => {
capturedSubscribeParams().onBalances({
[NATIVE_CURRENCY_ADDRESS]: '1000000000000000000',
[TOKEN_A]: '500',
})
})

expect(result.current.values[getAddressKey(NATIVE_CURRENCY_ADDRESS)]).toBe(1000000000000000000n)
expect(result.current.values[getAddressKey(TOKEN_A)]).toBe(500n)
expect(result.current.hasFirstLoad).toBe(true)
expect(result.current.isLoading).toBe(false)
expect(result.current.fromCache).toBe(false)
expect(result.current.error).toBeNull()
expect(result.current.chainId).toBe(SupportedChainId.MAINNET)
})

it('merges a diff into balancesAtom without clearing prior keys', async () => {
const session = deferred<void>()
mockCreateSession.mockReturnValueOnce(session.promise)

const { result } = renderSession()

await act(async () => {
session.resolve()
})
const sub = capturedSubscribeParams()

await act(async () => {
sub.onBalances({ [TOKEN_A]: '100', [TOKEN_B]: '200' })
})
await act(async () => {
sub.onBalances({ [TOKEN_B]: '999' })
})

expect(result.current.values[getAddressKey(TOKEN_A)]).toBe(100n)
expect(result.current.values[getAddressKey(TOKEN_B)]).toBe(999n)
})

it('writes the atom error and clears isLoading on a terminal SSE error', async () => {
const session = deferred<void>()
mockCreateSession.mockReturnValueOnce(session.promise)

const { result } = renderSession()

await act(async () => {
session.resolve()
})
const sub = capturedSubscribeParams()

await act(async () => {
sub.onError(new Error('stream closed by server'), true)
})

expect(result.current.error).toBe('stream closed by server')
expect(result.current.isLoading).toBe(false)
})

it('ignores non-terminal SSE errors (transport is reconnecting)', async () => {
const session = deferred<void>()
mockCreateSession.mockReturnValueOnce(session.promise)

const { result } = renderSession()

await act(async () => {
session.resolve()
})
const sub = capturedSubscribeParams()

await act(async () => {
sub.onError(new Error('transient'), false)
})

expect(result.current.error).toBeNull()
})

it('writes the atom error and clears isLoading when createSession rejects', async () => {
const session = deferred<void>()
mockCreateSession.mockReturnValueOnce(session.promise)

const { result } = renderSession()

await act(async () => {
session.reject(new BalancesWatcherApiError(503, { code: 1, message: 'service unavailable' }))
})

expect(result.current.error).toBe('service unavailable')
expect(result.current.isLoading).toBe(false)
expect(mockSubscribe).not.toHaveBeenCalled()
})

it('closes the subscription on unmount and ignores late events', async () => {
const session = deferred<void>()
mockCreateSession.mockReturnValueOnce(session.promise)
const close = jest.fn()
mockSubscribe.mockReturnValueOnce({ close })

const { result, unmount } = renderSession()

await act(async () => {
session.resolve()
})
const sub = capturedSubscribeParams()

unmount()
expect(close).toHaveBeenCalledTimes(1)

await act(async () => {
sub.onBalances({ [TOKEN_A]: '777' })
})
expect(result.current.values[getAddressKey(TOKEN_A)]).toBeUndefined()
})

it('discards a session whose POST resolves after a chainId change (race-guard)', async () => {
const stale = deferred<void>()
const fresh = deferred<void>()
mockCreateSession.mockReturnValueOnce(stale.promise).mockReturnValueOnce(fresh.promise)

const { result, rerender } = renderSession(makeParams({ chainId: SupportedChainId.MAINNET }))

rerender({ params: makeParams({ chainId: SupportedChainId.ARBITRUM_ONE }) })

// Stale chain=1 session resolves first; it must NOT open a subscription.
await act(async () => {
stale.resolve()
})
expect(mockSubscribe).not.toHaveBeenCalled()

// Fresh chain=42161 session resolves; subscription opens for that chain.
await act(async () => {
fresh.resolve()
})
expect(mockSubscribe).toHaveBeenCalledTimes(1)
expect(capturedSubscribeParams().chainId).toBe(SupportedChainId.ARBITRUM_ONE)

await act(async () => {
capturedSubscribeParams().onBalances({ [TOKEN_A]: '42' })
})
expect(result.current.chainId).toBe(SupportedChainId.ARBITRUM_ONE)
expect(result.current.values[getAddressKey(TOKEN_A)]).toBe(42n)
})
})
Loading
Loading