[ad_1]
I want to backup the raw data I receive from kafka stream. I've created my own implementation of the RichSinkFunction called S3Sink.
Inside invoke I add stuff to buffer, and when buffer is full - I flush data to S3.
public void invoke(String s) throws Exception
addToBuffer(s);
if (memoryBuffer.size() == 2000)
flushToS3();
However, as you are probably aware, S3 upload can and will eventually fail. That's why I want to checkpoint the pointer only if it didn't throw exception, like that:
try
s3Client.putObject(new PutObjectRequest(...));
catch (AmazonServiceException e)
commitOffset = false;
if (commitOffset)
// Commit kafka offset,
// otherwise - clear buffer and don't commit offset
// so it will be picked up the next time sink is called.
I'm aware of some workarounds, like in case of failure - just dump buffer to file and retry. But I would like to keep it simple so it's all handled inside one simple Sink.
Is it possible to accomplish what I'm trying to do? Or I'm overthinking this solution?
Cheers,
M.
[ad_2]
لینک منبع