-
Enhancement
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
JDBC sink connector does not handle BYTES as VARBINARY for SQLServer
What Debezium connector do you use and what version?
3.0.8
What is the connector configuration?
connector.class=io.debezium.connector.jdbc.JdbcSinkConnector
connection.password=...+
primary.key.mode=record_key
tasks.max=1
transforms=unwrap,route
connection.username=user
quote.identifiers=true
topics.regex=TESTDominoSource\.Domino\.dbo\._del_test_kafka25_2
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.])\.([^.])\.([^.])\.([^.])
delete.enabled=true
auto.evolve=true
transforms.unwrap.drop.tombstones=false
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
auto.create=true
connection.url=jdbc:sqlserver://server;databaseName=db_uat;encrypt=false
insert.mode=upsert
transforms.route.replacement=$4
What is the captured database version and mode of deployment?
SQL SERVER 2019
What behavior do you expect?
A table with varbinary field is sinked.
What behavior do you see?
Connector is failed with error:
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The conversion from UNKNOWN to VARBINARY is unsupported
Do you see the same behaviour using the latest released Debezium version?
didn.t tested but looking on code - yes, want to pull request to fix it
Do you have the connector logs, ideally from start till finish?
com.microsoft.sqlserver.jdbc.SQLServerException: The conversion from UNKNOWN to VARBINARY is unsupported.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:231)
at com.microsoft.sqlserver.jdbc.DataTypes.throwConversionError(DataTypes.java:1118)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.setObject(SQLServerPreparedStatement.java:1710)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.setObject(SQLServerPreparedStatement.java:1629)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.setObject(NewProxyPreparedStatement.java:769)
at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:32)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at io.debezium.connector.jdbc.RecordWriter.bindFieldValuesToQuery(RecordWriter.java:152)
at io.debezium.connector.jdbc.RecordWriter.bindNonKeyValuesToQuery(RecordWriter.java:136)
at io.debezium.connector.jdbc.RecordWriter.bindValues(RecordWriter.java:111)
at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:72)
at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:39)
at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$4(AbstractSharedSessionContract.java:1042)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:303)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:1053)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:1041)
at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:50)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:218)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBufferWithRetries(JdbcChangeEventSink.java:192)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:172)
at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:986)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:172)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:151)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:127)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
How to reproduce the issue using our tutorial deployment?
- create a source table with varbinary column
- setup a JDBC sink with similar config above, create empty target table with varbinary field
- run JDBC Sink