Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8790

Handle BYTES as VARBINARY in SQLServer sink

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Major Major
    • 3.1.0.CR1
    • None
    • jdbc-connector
    • None

      JDBC sink connector does not handle BYTES as VARBINARY for SQLServer

      What Debezium connector do you use and what version?

      3.0.8

      What is the connector configuration?

      connector.class=io.debezium.connector.jdbc.JdbcSinkConnector
      connection.password=...+
      primary.key.mode=record_key
      tasks.max=1
      transforms=unwrap,route
      connection.username=user
      quote.identifiers=true
      topics.regex=TESTDominoSource\.Domino\.dbo\._del_test_kafka25_2
      transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
      transforms.route.regex=([^.])\.([^.])\.([^.])\.([^.])
      delete.enabled=true
      auto.evolve=true
      transforms.unwrap.drop.tombstones=false
      transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
      auto.create=true
      connection.url=jdbc:sqlserver://server;databaseName=db_uat;encrypt=false
      insert.mode=upsert
      transforms.route.replacement=$4

      What is the captured database version and mode of deployment?

      SQL SERVER 2019

      What behavior do you expect?

      A table with varbinary field is sinked.

      What behavior do you see?

      Connector is failed with error:

       

      Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The conversion from UNKNOWN to VARBINARY is unsupported

      Do you see the same behaviour using the latest released Debezium version?

      didn.t tested but looking on code -  yes, want to pull request to fix it

      Do you have the connector logs, ideally from start till finish?

      com.microsoft.sqlserver.jdbc.SQLServerException: The conversion from UNKNOWN to VARBINARY is unsupported.
              at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:231)
              at com.microsoft.sqlserver.jdbc.DataTypes.throwConversionError(DataTypes.java:1118)
              at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.setObject(SQLServerPreparedStatement.java:1710)
              at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.setObject(SQLServerPreparedStatement.java:1629)
              at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.setObject(NewProxyPreparedStatement.java:769)
              at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:32)
              at java.base/java.lang.Iterable.forEach(Iterable.java:75)
              at io.debezium.connector.jdbc.RecordWriter.bindFieldValuesToQuery(RecordWriter.java:152)
              at io.debezium.connector.jdbc.RecordWriter.bindNonKeyValuesToQuery(RecordWriter.java:136)
              at io.debezium.connector.jdbc.RecordWriter.bindValues(RecordWriter.java:111)
              at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:72)
              at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:39)
              at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$4(AbstractSharedSessionContract.java:1042)
              at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:303)
              at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:1053)
              at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:1041)
              at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:50)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:218)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBufferWithRetries(JdbcChangeEventSink.java:192)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:172)
              at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:986)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:172)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:151)
              at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:127)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
              at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
              at java.base/java.lang.Thread.run(Thread.java:1583)

      How to reproduce the issue using our tutorial deployment?

      1. create a source table with varbinary column
      2. setup a JDBC sink with similar config above, create empty target table with varbinary field
      3. run JDBC Sink

       

              Unassigned Unassigned
              yuriyv Yuriy Vikulov (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: