-
Feature Request
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
It ruins performance at all especially on some cases:
On my loading test I have the loop of delete, update, insert operation.
Delete and insert operations are made on the same primary keys.
So out batch isn't filled because of here:
and here:
and connector flushs the buffer everytime after we get insert operation after delete operation or vise versa.
At the end I got horrible traffic in my dashboard panel.
I expect to have no buffer flushing when it's not needed.
As a spoiler on the same load without force flushing(via my code fix) I got this:
Implementation ideas (optional)
Connector has reduction buffer logic that the connector overrides events with the same record key. It works fine if we have only upsert(for example) events with the same key so that eventually only the one record will be flushed to db within single batch.
But withing single batch we can have delete and upsert events and reduction logic doesn't work there. We will have two events with the same key. It happens because we have 2 buffers, one for delete events, one for upsert events.
But since connector follow events order we can spread reduction logic to reduce events to one event withing batch despite we can have and delete and upsert events.
How we can do this? We can remove outdated event from the second buffer after we got new event on the first buffer.
if (sinkRecordDescriptor.isDelete()) { ... if (config.isUseReductionBuffer()) { updateBufferByTable.get(tableId).remove(sinkRecordDescriptor); } ... } else { ... if (config.isUseReductionBuffer()) { deleteBufferByTable.get(tableId).remove(sinkRecordDescriptor); } ... }
Follow this logic it means that instead of buffer flushing we can remove the outdated record to avoid losing delete/upsert event when we have reduction buffer = true.
full patch for 2.7 here:
I can create pull request if my implementation idea is fine for you