Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure writes are always atomic #97

Closed
wants to merge 11 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 66 additions & 54 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ const fs = require('fs')
const EventEmitter = require('events')
const inherits = require('util').inherits
const path = require('path')
const assert = require('assert')
const sleep = require('atomic-sleep')

const BUSY_WRITE_TIMEOUT = 100

const sleep = require('atomic-sleep')

// 16 MB - magic number
// This constant ensures that SonicBoom only needs
// 32 MB of free memory to run. In case of having 1GB+
Expand Down Expand Up @@ -59,8 +59,7 @@ function openFile (file, sonic) {
}

// start
const len = sonic._buf.length
if (len > 0 && len > sonic.minLength && !sonic.destroyed) {
if (!sonic._writing && sonic._len > sonic.minLength && !sonic.destroyed) {
actualWrite(sonic)
}
}
Expand Down Expand Up @@ -94,7 +93,8 @@ function SonicBoom (opts) {

fd = fd || dest

this._buf = ''
this._bufs = []
this._len = 0
this.fd = -1
this._writing = false
this._writingBuf = ''
Expand All @@ -107,6 +107,7 @@ function SonicBoom (opts) {
this.sync = sync || false
this.append = append || false
this.mkdir = mkdir || false
this._againTimeout = null

if (typeof fd === 'number') {
this.fd = fd
Expand All @@ -133,51 +134,46 @@ function SonicBoom (opts) {
}
} else {
// Let's give the destination some time to process the chunk.
setTimeout(() => {
this._againTimeout = setTimeout(() => {
this._againTimeout = null
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
}, BUSY_WRITE_TIMEOUT)
}
} else {
// The error maybe recoverable later, so just put data back to this._buf
this._buf = this._writingBuf + this._buf
this._writingBuf = ''
this._writing = false

this.emit('error', err)
}
return
}

if (this._writingBuf.length !== n) {
this._writingBuf = this._writingBuf.slice(n)
if (this.sync) {
try {
do {
n = fs.writeSync(this.fd, this._writingBuf, 'utf8')
this._writingBuf = this._writingBuf.slice(n)
} while (this._writingBuf.length !== 0)
} catch (err) {
this.release(err)
return
}
} else {
this._len -= n
this._writingBuf = this._writingBuf.slice(n)

if (this._writingBuf.length) {
if (!this.sync) {
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
return
}
}

this._writingBuf = ''

if (this.destroyed) {
return
try {
do {
const n = fs.writeSync(this.fd, this._writingBuf, 'utf8')
this._len -= n
this._writingBuf = this._writingBuf.slice(n)
} while (this._writingBuf)
} catch (err) {
this.release(err)
return
}
}

const len = this._buf.length
const len = this._len
if (this._reopening) {
this._writing = false
this._reopening = false
this.reopen()
} else if (len > 0 && len > this.minLength) {
} else if (len > this.minLength) {
actualWrite(this)
} else if (this._ending) {
if (len > 0) {
Expand Down Expand Up @@ -220,12 +216,26 @@ SonicBoom.prototype.write = function (data) {
throw new Error('SonicBoom destroyed')
}

this._buf += data
const len = this._buf.length
if (!this._writing && len > this.minLength) {
const len = this._len + data.length

if (!this._writing && len > MAX_WRITE) {
this._bufs.push('')
}

if (!this._bufs.length) {
this._bufs.push('')
}

assert(this._bufs.length)

this._bufs[this._bufs.length - 1] += data
this._len = len

if (!this._writing && this._len > this.minLength) {
actualWrite(this)
}
return len < 16384

return len < 163874
}

SonicBoom.prototype.flush = function () {
Expand Down Expand Up @@ -298,16 +308,15 @@ SonicBoom.prototype.end = function () {

this._ending = true

if (!this._writing && this._buf.length > 0 && this.fd >= 0) {
actualWrite(this)
return
}

if (this._writing) {
return
}

actualClose(this)
if (this._len > 0 && this.fd >= 0) {
actualWrite(this)
} else {
actualClose(this)
}
}

SonicBoom.prototype.flushSync = function () {
Expand All @@ -319,10 +328,16 @@ SonicBoom.prototype.flushSync = function () {
throw new Error('sonic boom is not ready yet')
}

while (this._buf.length > 0) {
if (this._againTimeout) {
this._againTimeout = null
this._bufs.unshift(this._writingBuf)
this._writingBuf = ''
}

while (this._bufs.length) {
try {
fs.writeSync(this.fd, this._buf, 'utf8')
this._buf = ''
this._len -= fs.writeSync(this.fd, this._bufs[0], 'utf8')
this._bufs.shift()
} catch (err) {
if (err.code !== 'EAGAIN') {
throw err
Expand All @@ -341,25 +356,22 @@ SonicBoom.prototype.destroy = function () {
}

function actualWrite (sonic) {
sonic._writing = true
let buf = sonic._buf
const release = sonic.release
if (buf.length > MAX_WRITE) {
buf = buf.slice(0, MAX_WRITE)
sonic._buf = sonic._buf.slice(MAX_WRITE)
} else {
sonic._buf = ''
}
sonic._writingBuf = buf
sonic._writing = true
sonic._writingBuf = sonic._writingBuf || sonic._bufs.shift()

assert(sonic._len)
assert(sonic._writingBuf)

if (sonic.sync) {
try {
const written = fs.writeSync(sonic.fd, buf, 'utf8')
const written = fs.writeSync(sonic.fd, sonic._writingBuf, 'utf8')
release(null, written)
} catch (err) {
release(err)
}
} else {
fs.write(sonic.fd, buf, 'utf8', release)
fs.write(sonic.fd, sonic._writingBuf, 'utf8', release)
}
}

Expand All @@ -381,7 +393,7 @@ function actualClose (sonic) {
sonic.emit('close')
})
sonic.destroyed = true
sonic._buf = ''
sonic._bufs = []
}

/**
Expand Down