Skip to content

Commit 3d57904

Browse files
authored
Handle closed user streams properly (#499)
1 parent a26881a commit 3d57904

File tree

1 file changed

+21
-5
lines changed

1 file changed

+21
-5
lines changed

src/websocket.js

+21-5
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,10 @@ export const userEventHandler = (cb, transform = true, variator) => msg => {
609609
)
610610
}
611611

612+
const userErrorHandler = (cb, transform = true) => error => {
613+
cb({[transform ? 'eventType' : 'type']: 'error', error})
614+
}
615+
612616
const STREAM_METHODS = ['get', 'keep', 'close']
613617

614618
const capitalize = (str, check) => (check ? `${str[0].toUpperCase()}${str.slice(1)}` : str)
@@ -630,22 +634,28 @@ const user = (opts, variator) => (cb, transform) => {
630634
let currentListenKey = null
631635
let int = null
632636
let w = null
637+
let keepClosed = false
638+
let errorHandler = userErrorHandler(cb, transform)
633639

634640
const keepAlive = isReconnecting => {
635641
if (currentListenKey) {
636-
keepStreamAlive(keepDataStream, currentListenKey).catch(() => {
642+
keepStreamAlive(keepDataStream, currentListenKey).catch(err => {
637643
closeStream({}, true)
638644

639645
if (isReconnecting) {
640646
setTimeout(() => makeStream(true), 30e3)
641647
} else {
642648
makeStream(true)
643649
}
650+
651+
opts.emitStreamErrors && errorHandler(err)
644652
})
645653
}
646654
}
647655

648-
const closeStream = (options, catchErrors) => {
656+
const closeStream = (options, catchErrors = false, setKeepClosed = false) => {
657+
keepClosed = setKeepClosed
658+
649659
if (currentListenKey) {
650660
clearInterval(int)
651661

@@ -661,24 +671,30 @@ const user = (opts, variator) => (cb, transform) => {
661671
}
662672

663673
const makeStream = isReconnecting => {
664-
return getDataStream()
674+
return !keepClosed && getDataStream()
665675
.then(({ listenKey }) => {
676+
if (keepClosed) {
677+
return closeDataStream({ listenKey }).catch(f => f)
678+
}
679+
666680
w = openWebSocket(
667681
`${variator === 'futures' ? endpoints.futures : endpoints.base}/${listenKey}`,
668682
)
669683
w.onmessage = msg => userEventHandler(cb, transform, variator)(msg)
684+
opts.emitSocketErrors && (w.onerror = ({error}) => errorHandler(error))
670685

671686
currentListenKey = listenKey
672687

673688
int = setInterval(() => keepAlive(false), 50e3)
674689

675690
keepAlive(true)
676691

677-
return options => closeStream(options)
692+
return options => closeStream(options, false, true)
678693
})
679694
.catch(err => {
680695
if (isReconnecting) {
681-
setTimeout(() => makeStream(true), 30e3)
696+
!keepClosed && setTimeout(() => makeStream(true), 30e3)
697+
opts.emitStreamErrors && errorHandler(err)
682698
} else {
683699
throw err
684700
}

0 commit comments

Comments
 (0)