Skip to content

Commit 3da5280

Browse files
committed
Set BLOB via ScatteringByteChannel (first attempt)
1 parent 1f01142 commit 3da5280

File tree

6 files changed

+99
-19
lines changed

6 files changed

+99
-19
lines changed

mysql-async/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ String | string
9191
Array[Byte] | blob
9292
java.nio.ByteBuffer | blob
9393
io.netty.buffer.ByteBuf | blob
94+
java.nio.channels.ScatteringByteChannel | blob
9495

95-
The maximum size of a blob is 2^24-9 bytes (almost 16 MiB).
96+
The maximum size of a blob you set via Array[Byte], ByteBuffer or ByteBuf is 2^24-9 bytes (almost 16 MiB).
97+
Blobs set via a Channel can be larger, the Channel is read until EOF and then closed.
9698

9799
You don't have to match exact values when sending parameters for your prepared statements, MySQL is usually smart
98100
enough to understand that if you have sent an Int to `smallint` column it has to truncate the 4 bytes into 2.

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoder.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.github.mauricio.async.db.mysql.binary
1818

19+
import java.nio.channels.ScatteringByteChannel
20+
1921
import io.netty.buffer.ByteBuf
2022
import java.nio.ByteBuffer
2123
import java.nio.charset.Charset
@@ -88,6 +90,7 @@ class BinaryRowEncoder( charset : Charset ) {
8890
case v : java.util.Date => JavaDateEncoder
8991
case v : ByteBuffer => ByteBufferEncoder
9092
case v : ByteBuf => ByteBufEncoder
93+
case v : ScatteringByteChannel => DummyBlobEncoder
9194
}
9295
}
9396
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.github.mauricio.async.db.mysql.binary.encoder
2+
3+
import com.github.mauricio.async.db.mysql.column.ColumnTypes
4+
import io.netty.buffer.ByteBuf
5+
6+
object DummyBlobEncoder extends BinaryEncoder {
7+
8+
def encode(value: Any, buffer: ByteBuf): Unit = {
9+
throw new UnsupportedOperationException()
10+
}
11+
12+
def encodesTo: Int = ColumnTypes.FIELD_TYPE_BLOB
13+
14+
}

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.github.mauricio.async.db.mysql.codec
1818

1919
import java.nio.ByteBuffer
20+
import java.nio.channels.ScatteringByteChannel
2021

2122
import com.github.mauricio.async.db.Configuration
2223
import com.github.mauricio.async.db.general.MutableResultSet
@@ -260,6 +261,7 @@ class MySQLConnectionHandler(
260261
case v : Array[Byte] => v.length > SendLongDataEncoder.LONG_THRESHOLD
261262
case v : ByteBuffer => v.remaining() > SendLongDataEncoder.LONG_THRESHOLD
262263
case v : ByteBuf => v.readableBytes() > SendLongDataEncoder.LONG_THRESHOLD
264+
case _ : ScatteringByteChannel => true
263265

264266
case _ => false
265267
}
@@ -275,9 +277,27 @@ class MySQLConnectionHandler(
275277

276278
case v : ByteBuf =>
277279
sendBuffer(v, statementId, index)
280+
281+
case channel : ScatteringByteChannel =>
282+
sendChannel(channel, statementId, index)
278283
}
279284
}
280285

286+
// TODO this is blocking
287+
private def sendChannel(channel: ScatteringByteChannel, statementId: Array[Byte], paramId: Int) {
288+
var bytesWritten = 0
289+
do {
290+
val dataBuffer = Unpooled.directBuffer(SendLongDataEncoder.INITIAL_BUFFER_SIZE, SendLongDataEncoder.MAX_BUFFER_SIZE)
291+
do {
292+
bytesWritten = dataBuffer.writeBytes(channel, SendLongDataEncoder.MAX_BUFFER_SIZE)
293+
} while (bytesWritten == 0)
294+
if (bytesWritten > 0) {
295+
sendBuffer(dataBuffer, statementId, paramId)
296+
}
297+
} while (bytesWritten > -1)
298+
channel.close()
299+
}
300+
281301
private def sendBuffer(buffer: ByteBuf, statementId: Array[Byte], paramId: Int) {
282302
writeAndHandleError(new SendLongDataMessage(statementId, buffer, paramId))
283303
}

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/SendLongDataEncoder.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ object SendLongDataEncoder {
1010
val log = Log.get[SendLongDataEncoder]
1111

1212
val LONG_THRESHOLD = 1023
13+
14+
val INITIAL_BUFFER_SIZE = 1024 // 1 KiB
15+
16+
val MAX_BUFFER_SIZE = 1024*1024 // 1 MiB
1317
}
1418

1519
class SendLongDataEncoder

mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/BinaryColumnsSpec.scala

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.github.mauricio.async.db.mysql
22

3-
import org.specs2.mutable.Specification
3+
import java.io.{FileInputStream, FileOutputStream, BufferedOutputStream, File}
4+
5+
import org.specs2.mutable.{After, Specification}
46
import java.util.UUID
57
import java.nio.ByteBuffer
68
import io.netty.buffer.Unpooled
@@ -9,6 +11,17 @@ import com.github.mauricio.async.db.RowData
911

1012
class BinaryColumnsSpec extends Specification with ConnectionHelper {
1113

14+
val createBlobTable =
15+
"""CREATE TEMPORARY TABLE POSTS (
16+
| id INT NOT NULL,
17+
| blob_column LONGBLOB,
18+
| primary key (id))
19+
""".stripMargin
20+
21+
val insertIntoBlobTable = "INSERT INTO POSTS (id,blob_column) VALUES (?,?)"
22+
23+
val selectFromBlobTable = "SELECT id,blob_column FROM POSTS ORDER BY id"
24+
1225
"connection" should {
1326

1427
"correctly load fields as byte arrays" in {
@@ -106,35 +119,43 @@ class BinaryColumnsSpec extends Specification with ConnectionHelper {
106119

107120
}
108121

109-
"support BLOB type with large values" in {
122+
"support BLOB type with long values" in {
110123

111124
val bytes = (1 to 2100).map(_.toByte).toArray
112125

113126
testBlob(bytes)
114127

115128
}
116129

117-
}
130+
"support BLOB type with ScatteringByteChannel input" in new BlobFile {
118131

119-
def testBlob(bytes: Array[Byte]) = {
120-
val create =
121-
"""CREATE TEMPORARY TABLE POSTS (
122-
| id INT NOT NULL,
123-
| blob_column BLOB,
124-
| primary key (id))
125-
""".stripMargin
132+
withConnection {
133+
connection =>
134+
executeQuery(connection, createBlobTable)
135+
136+
val channel = new FileInputStream(blobFile).getChannel
137+
executePreparedStatement(connection, insertIntoBlobTable, 1, channel)
138+
139+
val Some(rows) = executeQuery(connection, selectFromBlobTable).rows
140+
rows(0)("id") === 1
141+
val retrievedBlob = rows(0)("blob_column").asInstanceOf[Array[Byte]]
142+
retrievedBlob.length === BlobSize
143+
0 to retrievedBlob.length-1 foreach { n => retrievedBlob(n) === n.toByte }
144+
}
126145

127-
val insert = "INSERT INTO POSTS (id,blob_column) VALUES (?,?)"
128-
val select = "SELECT id,blob_column FROM POSTS ORDER BY id"
146+
}
129147

148+
}
149+
150+
def testBlob(bytes: Array[Byte]) = {
130151
withConnection {
131152
connection =>
132-
executeQuery(connection, create)
133-
executePreparedStatement(connection, insert, 1, Some(bytes))
134-
executePreparedStatement(connection, insert, 2, ByteBuffer.wrap(bytes))
135-
executePreparedStatement(connection, insert, 3, Unpooled.wrappedBuffer(bytes))
153+
executeQuery(connection, createBlobTable)
154+
executePreparedStatement(connection, insertIntoBlobTable, 1, Some(bytes))
155+
executePreparedStatement(connection, insertIntoBlobTable, 2, ByteBuffer.wrap(bytes))
156+
executePreparedStatement(connection, insertIntoBlobTable, 3, Unpooled.wrappedBuffer(bytes))
136157

137-
val Some(rows) = executeQuery(connection, select).rows
158+
val Some(rows) = executeQuery(connection, selectFromBlobTable).rows
138159
rows(0)("id") === 1
139160
rows(0)("blob_column") === bytes
140161
rows(1)("id") === 2
@@ -149,4 +170,20 @@ class BinaryColumnsSpec extends Specification with ConnectionHelper {
149170
def compareBytes( row : RowData, column : String, expected : String ) =
150171
row(column) === expected.getBytes(CharsetUtil.UTF_8)
151172

152-
}
173+
}
174+
175+
trait BlobFile extends After {
176+
val BlobSize = (16 * 1024 * 1024)-9
177+
178+
lazy val blobFile = {
179+
val file = File.createTempFile("blob", null)
180+
val bos = new BufferedOutputStream(new FileOutputStream(file))
181+
0 to BlobSize-1 foreach { n => bos.write(n.toByte) }
182+
bos.close()
183+
file
184+
}
185+
186+
def after = {
187+
blobFile.delete()
188+
}
189+
}

0 commit comments

Comments
 (0)