Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recordTtl = Long.MAX_VALUE causes records to expire immediately & no standard exception message in UserRecordFailedException #54

Open
endario opened this issue Jun 7, 2016 · 4 comments · May be fixed by #145
Labels

Comments

@endario
Copy link

endario commented Jun 7, 2016

I'm fairly new to Kinesis and any help is greatly appreciated.

I pretty much followed the code example on AWS to the letter but every single callback fails with a null message. I'm pretty stuck now as I can't find anyone help having similar problems.

I ran the program on both Rackspace Linux and Mac OS X with the same result. I've also tried changing the partition key and the record content without success.

I'm using the latest 0.10.2 and installed using Maven.

Included the full stack trace below:

[2016-06-07 18:28:54.862884] [0x0000700000081000] [info] [kinesis_producer.cc:79] Created pipeline for stream "my-kinesis-stream"
[2016-06-07 18:28:54.863092] [0x0000700000081000] [info] [shard_map.cc:83] Updating shard map for stream "my-kinesis-stream"
[2016-06-07 18:28:54.988505] [0x00007fff7cc3f000] [info] [shard_map.cc:163] Successfully updated shard map for stream "my-kinesis-stream" found 1 shards
18:28:55.895 [kpl-callback-pool-0-thread-0] WARN c.s.kinesis.KinesisRecordProducer - Failed to send record '1465320533000' to Kinesis.
com.amazonaws.services.kinesis.producer.UserRecordFailedException: null
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.onPutRecordResult(KinesisProducer.java:188) [amazon-kinesis-producer-0.10.2.jar:na]
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.access$000(KinesisProducer.java:127) [amazon-kinesis-producer-0.10.2.jar:na]
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$1.run(KinesisProducer.java:134) [amazon-kinesis-producer-0.10.2.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_66]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]

@endario
Copy link
Author

endario commented Jun 8, 2016

Turned out to me a lesser issue.

I set recordTtl to Long.MAX_VALUE as the document # Maximum (inclusive): 9223372036854775807 suggests that it's valid value, but it ended up making all record to expire immediately.

Personally I think providing a standard Exception message in UserRecordFailedException helps debugging, but the exception did reveal the cause once I enhanced its handling (thanks to existing code examples):

Futures.addCallback(future, new FutureCallback() {
@OverRide
public void onFailure(@nonnull Throwable throwable) {
if (throwable instanceof UserRecordFailedException) {
Attempt last = Iterables.getLast(((UserRecordFailedException) throwable).getResult().getAttempts());
LOGGER.warn("Failed to put record. Error code '{}' : '{}'.", last.getErrorCode(), last.getErrorMessage());
}
LOGGER.warn(String.format("Exception occurred when sending record '%s' to Kinesis", partitionKey), throwable);
}

@endario endario changed the title UserRecordFailedException: null every time recordTtl = Long.MAX_VALUE causes records to expire immediately | no standard exception message in UserRecordFailedException Jun 8, 2016
@endario endario changed the title recordTtl = Long.MAX_VALUE causes records to expire immediately | no standard exception message in UserRecordFailedException recordTtl = Long.MAX_VALUE causes records to expire immediately & no standard exception message in UserRecordFailedException Jun 8, 2016
@samuelgmartinez
Copy link

This is an advice, not a comment on the issue itself :)

Be careful setting the TTL that long, because the TTL is meant to discard records when your producer is throttled because you are doing so much requests/putting so much data.

This throttled records are stored in the daemon using an unbounded buffer. So if you have been throttled and you have this high TTLs the processor will crash because eventually it will run out of memory. So you should assume that in the case of being throttled you may lose some records :(

If you expect to hit that you can use multiple shards or, if your records are small enough, use producer aggregation to pack multiple records into one single kinesis record.

:

@pfifer
Copy link
Contributor

pfifer commented Oct 4, 2016

Thanks for reporting this we'll look into updating the documentation, and the validation to prevent setting the record TTL Long.MAX_VALUE.

I agree with @samuelgmartinez on being very careful with large TTLs.

@pfifer pfifer added the bug label Feb 15, 2017
@asnare
Copy link

asnare commented Jun 26, 2017

So, um, what is the largest valid value here?

I was just trying to figure this out…

The protobuf definition used to transfer this setting says "unsigned 64-bit". That seems to be a lie everyone is willing to live with:

  • The Java side is signed which means we're limited to 63-bit;
  • The documented maximum corresponds to the maximum value that fits into a signed 63-bit integer.
  • The C++ side punts the value into a duration of type std::chrono::milliseconds. My C++-fu is too antiquated to fully understand what that is, but I think a blind signedness-conversion happens and the underlying value ends up 63-bit signed like the Java side, so all is well in the world.

The point at which things seem to go wrong is the arithmetic for expiry. When a message is queued, the time at which the timeout is calculated by adding 'now' to the record_ttl value. This promptly overflows to a time which has already lapsed.

This bug is a frequent issue with timeout logic. The correct way to handle expiry is:

  1. Record the start time. (Do not calculate the expected expiry time.)
  2. When checking for expiry, calculated the elapsed time: now() - start_time
  3. Expiry has occurred if elapsed_time >= record_ttl.

Steps 2 and 3 are safe from an overflow perspective, assuming time always increases. This is why it's important for expiry logic to also use a monotonically-increasing clock. (I think that means using std::chrono::steady_clock instead of std::chrono::high_resolution_clock since the latter can go backwards.)

Coming full circle…

  • The current value for the maximum record TTL is probably correct.
  • There's a bug in the expiry logic, which means that large values (including the value suggested in the documentation) trigger an internal integer overflow.

I apologise if I've misunderstood the C++ side of things, but I suspect this is what's going on.

danmactough added a commit to danmactough/amazon-kinesis-producer that referenced this issue Oct 9, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants