Missing SQS messages

I’m not sure if this is the issue with my understanding of how SQS queue works or some bug, but the issue is this:
I’m trying to use SQS queue to run multiple Lambdas in parallel to save time. I followed the tutorial and ended up with two Jobs, one that saves entries in the database and add messages with that ids to SQS, and another one which should be triggered by SQS and which takes given id and do some processing.
Here are my two Jobs:

class DispatcherJob < ApplicationJob
  include Jets::AwsServices

  iam_policy 'sqs'
  def dispatch
    Jets.logger.info("Dispatched from job: #{event}.")
    queue_url = List.lookup(:waitlist_url)
    message_body = JSON.dump(event)
    sqs.send_message(queue_url:, message_body:)
  end
end

and

class CourierJob < ApplicationJob
  class_timeout 30
  depends_on :list
  sqs_event ref(:waitlist)
  def process
    result = ProcessEmail.new.call(event)

    if result.success?
      Jets.logger.info("Processed email #{result.success}.")
    else
      Jets.logger.error("Failed email #{result.failure}")
    end
  end
end

Now, I’m logging everything because I can’t find where are those messages missing between first and the second. When I send 6 objects to Dispatch, I get only 5 processed objects. When I go with 10, I get 7-8 processed… I checked database and everything seem correct, those that are not logged are missing data set in the processing step.
Am I even using this as intended? Is there a better way and what am I missing about SQS queues?

P. S. I tried to add rescue clauses to everything with logging but no errors are raised.

Unsure. Put together a demo test project.

The Readme covers how I tested it. Here’s the relevant part.

Session Output

❯ jets console
Jets booting up in development mode!
irb(main):001:0> 5.times { |i| DispatcherJob.perform_now(:dispatch, test: "message #{i+1}") }
Dispatched from job: {"test"=>"message 1"}
Dispatched from job: {"test"=>"message 2"}
Dispatched from job: {"test"=>"message 3"}
Dispatched from job: {"test"=>"message 4"}
Dispatched from job: {"test"=>"message 5"}
=> 5
irb(main):002:0>

Tail with a grep test

❯ jets logs -f -n courier_job-process | grep test
Tailing logs for /aws/lambda/testapp-dev-courier_job-process
2023-12-21 15:18:56 UTC CourierJob sqs_event_payload: {"test"=>"message 1"}
2023-12-21 15:18:56 UTC CourierJob sqs_event_payload: {"test"=>"message 2"}
2023-12-21 15:18:56 UTC CourierJob sqs_event_payload: {"test"=>"message 3"}
2023-12-21 15:18:56 UTC CourierJob sqs_event_payload: {"test"=>"message 4"}
2023-12-21 15:18:56 UTC CourierJob sqs_event_payload: {"test"=>"message 5"}

It confirms that messages are being delivered.

Related Though a Little Off Topic

I also noticed this post: Store Shared Resource Name Store?. There’s a caveat with the List.lookup method.

The lookup method works to get the SQS queue info by making an API call to CloudFormation to grab the output. He was running into CloudFormation rate limits. He ended up storing the info in an SSM param store. You could also store it in a config file to look up. It’s not ideal, but there are ways.

I’m also thinking that it’ll be good to add some caching for the lookup call. Jets can cache it in the “cold start/shared memory” space. So it’s only looked up once until the Lambda Function has been “rotated” or cold-started. Folks have measured cold-start frequency and report that it’s 45-60m. That should reduce the Cloudformation calls dramatically since it’ll only make the CloudFormation describe call once every 45-60m instead of every request.

Update : Whoa. Wait a minute. Think you’re right. I see drops. It’s tricky because it seems to be happening only during the cold start.

When there’s a cold start, I’ll see 2/5 logs instead of all 5. When there’s no cold start, I always see 5 logs.

How I’m reproducing this.

  1. Go into the Lambda console

  2. Change the handlers/jobs/courier_job.rb to create a “cold-start”

  3. Run the jets console test

  4. Look at the logs

I also simplified the handler shim in the Lambda console to a bare minimum without Jets.

handlers/jobs/courier_job.rb

def process(event:, context:)
  puts "simple shim test event #{event}"
end

It’s quite simple. Jets is not even involved. So it might be an AWS thing.

Wondering:

  • If the message are actually delivering, but maybe CloudWatch is not logging? IE: Some concurrency rate limit issues with CloudWatch.

  • An idea: Try to add an SNS topic to the code and deliver a message. Subscribe to it via email and see if you get 5 emails.

  • If that is the case, that would mean it’s just CloudWatch logs.

It might be good to create a ticket for AWS. Will have to take a closer look in a bit though.

Update: SQS messages are delivering and Lambda function is running, it’s just CloudWatch not always logging on a cold-start. Details below.

I updated the test project https://github.com/tongueroo/demo-sqs-courier to also publish to an SNS topic.

I then manually subscribed my email to the topic so I could see whether or not all the SQS messages are triggering the Lambda functions.

Here’s what’s going:

  • All the SQS messages are delivered.

  • Lambda function is always firing

  • The SNS topic is always publishing

  • CloudWatch is not always logging on a cold-start.

Here’s a screenshot link: https://bit.ly/473XChi (Currently, the forum images are broken. They’re uploading fine but are not displaying. I haven’t had the time to fix it.)

I can see that all 5 SNS messages are delivered. However, depending on whether or not the Lambda function is a cold-start, I only see 2 or 3 of the 5 log messages in CloudWatch. Yup. It’s annoying.

What’s probably happening is that the Lambda Function is finishing just fast enough so that AWS Lambda Service that’s running the Lambda function “shuts down” the process before the CloudWatch process or thread it’s using gets a chance to send the logs to CloudWatch. I’m guessing it’s a race condition of some sort. I believe this is an AWS Lambda + CloudWatch + cold-start issue. It may be worth raising ticket with AWS so they’re aware.

One thing you can do is throw a sleep 10, which will give Lambda enough time to send the logs even during a cold start. It’s a hack, but it would probably work.

Also, I ran into an annoying behavior with Jets Events and User IAM Policy. Fixed it here: https://github.com/rubyonjets/jets/pull/698 It’s released in Jets 5.0.6 The demo project above requires 5.0.6. Otherwise, you have to do something like this iam_policy("sns", "sqs"), instead of iam_policy "sns"

Interesting Note

Just an interesting observation. The SQS messages are always processed in order after the cold start. When there’s a newly deployed Lambda function and a cold start, you can see the SQS messages being processed out of order. This is because AWS Lambda sees 5 rapid-fire messages from the SQS Events and probably proceeds to start 5 concurrent Lambda functions simultaneously. This shows how AWS Lambda scales for you. It’s just interesting to note.

The default AWS concurrency limit was 1000. See: Request a concurrency limit increase for your Lambda function | AWS re:Post

The default concurrency limit per AWS Region is 1,000 invocations at any given time. However, note that new AWS accounts have reduced concurrency and memory quotas.

On brand new AWS accounts, it’s even lower. See: https://twitter.com/donkersgood/status/1737313189174419710

AWS probably is doing this to prevent spammers. On existing AWS accounts, it 1,000. These are soft limits. AWS can increase them with a ticket request.

I’ll try simplified project, but for now I can tel that it’s not about the logs because I added them later to see why each function is not saving some attribute to the database. It’s just not running and I’m wondering if I should use something else for this task.
Maybe SQS queue is just not made to be so reliable, especially if it’s AWS issue and not Jets issue.

Oh I see. So the database attribute was not updated. That means the function was not trigger/running.

RE: Maybe SQS queue is just not made to be so reliable, especially if it’s AWS issue and not Jets issue.

Maybe. Though, feel that SQS is used enough that it if was, would think that people would had been vocal about it. Just my 2 cents.

Wondering if it was the IAM permission issue that I just ran into while working on this https://github.com/rubyonjets/jets/pull/698

Essentially, when the sqs_event ref(:waitlist) is called. It auto-adds a default IAM policy that allows sqs:... IAM permissions https://github.com/rubyonjets/jets/blob/31e6a9591b53731d89997bf70759b1c4ec22e74f/lib/jets/job/dsl/sqs_event.rb#L89

However, if you define a custom iam policy like iam_policy "sns" above the Lambda function, the default would not be used at all.

So you would need something like this:

class CourierJob < ApplicationJob
  iam_policy("sns", "sqs") # needed for Jets 5.0.5 and below
  # instead of iam_policy("sns") # works for Jets 5.0.6 and above
  sqs_event ref(:waitlist)
  def process
    # ...
  end
end

I noticed this behavior when I was debugging that the Lambda function would not run at all. It was a IAM permission issue and patched it on the spot. The behavior is more expected now.

When the Lambda function did not have IAM permission to ReceiveMessage from the SQS queue, I noticed no CloudWatch logs. But I also noticed that the SQS Queue would have messages “stuck”.

In the SQS Console, under More there’s Messages available information. You can also see this when you click on Send and receive messages. The messages were stuck in the Queue because the Lambda Function did not have sqs:ReceiveMessage IAM permission. SQS will do what it does and retain the messages. But can’t do much about AWS Lambda not having IAM permission. IAM permissions can sometimes trip you up.

When you get a chance, wondering if you can try jets 5.0.6 which includes the PR https://github.com/rubyonjets/jets/pull/698

Re-read and studied your original post more closely. That does seem odd about the timestamped items in the database. I’m wondering if you are inserting record or updating. If it’s updating maybe they are overwriting each other. But then the logs should show up… If it’s not currently inserting, would try inserting records with a recorded message and confirm that all the messages exist.

$ jets console
> 5.times { |i| DispatcherJob.perform_now(:dispatch, test: "message #{i+1}") }

Also, would double check IAM permissions again. Here’s a command to check the template after a jets deploy.

cat /tmp/jets/YOUR_APP/templates/app-courier_job.yml | yq '.Resources.CourierJobProcessIamPolicy'

Unsure here. Interested in what you find out.

Here is what I found up until now:

  1. Jets 5.0.6 is not making any difference
  2. Running 10 times DispatcherJob from console actually logs 10 events if it’s just test: message as you made it but when I tried to actually run with some data it started being flaky again. I’ll look into it more
  3. I don’t know what to do with your IAM permissions tip. I get “zsh: command not found: yq” and I’m not sure what yq is.

I’ll keep on trying things, next is running demo app and making it more complicated step by step so I can find where it breaks (if you don’t have some better suggestion).

edit:
I installed yq and this is the result:

Type: AWS::IAM::Policy
Properties:
  Roles:
    - Ref: CourierJobProcessIamRole
  PolicyName: mailroom-dev-courier-job-process-policy
  PolicyDocument:
    Version: '2012-10-17'
    Statement:
      - Action:
          - sqs:ReceiveMessage
          - sqs:DeleteMessage
          - sqs:GetQueueAttributes
        Effect: Allow
        Resource: "*"
      - Action:
          - logs:*
        Effect: Allow
        Resource: arn:aws:logs:eu-central...:log-group:/aws/lambda/mailroom-dev-*
      - Action:
          - s3:Get*
          - s3:List*
          - s3:HeadBucket
        Effect: Allow
        Resource: arn:aws:s3:::mailroom-dev-s3bucket...*
      - Action:
          - cloudformation:DescribeStacks
          - cloudformation:DescribeStackResources
        Effect: Allow
        Resource: arn:aws:cloudformation:eu-central...stack/mailroom-dev*

Can’t think of a better suggestion at the moment. Sometimes you’ve just gotta brute force it and make it more complex step-by-step. I’ve used a hammer many times before :grinning_face_with_smiling_eyes:

RE: zsh: command not found: yq

yq documentation It’s just the YAML version of jq

Really curious as to what’s going on here. Thanks for the update.

I just edited my response with the result of yq

1 Like

Cool. The IAM permissions for the Lambda function look fine. IE:

    Statement:
      - Action:
          - sqs:ReceiveMessage
          - sqs:DeleteMessage
          - sqs:GetQueueAttributes
        Effect: Allow
        Resource: "*"

So don’t think it’s IAM permissions.

Next time when the database attribute does not update. Would try checking the Messages available information in the SQS console. Just want to make sure that the messages are not “stuck” in the queue. I do not believe they are. Just double-checking.

Unsure what it is at the moment. Still curious. :eyes:

How do I check Messages available? I don’t even know how to look at SQS console to be honest :smiley:

Sure.

  1. Go to the AWS Dashboard and search for SQS
  2. Go to SQS Console
  3. Click on the Queue that was created by the Jets app. My app is called testapp. So my queue name is called something like this testapp-dev-List-1KOHUASCLZKM6-Waitlist-SOA60VfxzerN Yours will be prefixed by your app name.
  4. There’s a More/Hide caret option. Click on that to expand it.
  5. The Messages available is in there. It should say 0. Otherwise, messages have been placed on the Queue but the AWS Lambda function trigger is not processing them.

Usually, I just go from the CloudFormation stacks and click on the Physical Link ID. But for SQS Resources, they are not clickable in this case.

It’s a bummer that images are currently broken on the forum site :pensive: Unsure when will have time to fix that. Here are screenshot links for now

It’s 0, so nothing curious there.
I even tried to create DLQ but to no result (maybe because I don’t know how to make it or there isn’t anything there).
It appears I’ll have to go long way…

OK, I tested a bit further with the demo and changing of my project and I noticed some specifics.

This must have something with cold start because behavior changes when Courier function takes some time to finish. It was best visible when I was sending messages like
x.times { |i| DispatcherJob.perform_now(:dispatch, { email_id: 166+i }) }
and when I loaded those emails with logging of what is happening. If I start with 3-5 messages, all are read and it looks fine, but when I raise x to 20 it starts to miss some messages. If you then run 20 again, it all gets read.

I also noticed that when I was only reading emails from db it can get oddly precise and show no errors, but when I added some update to the record it starts to get flaky again. Adding sleep(0.5) confirmed that it’s about reusing the functions because when you add that it gets really messy. Just some of the functions are called and there are no rules about what will happen - you can call it 7 times and get just one log or next time get 5 and so on.

All in all, I’m not sure what is it and as you said yourself - it would be strange for SQS to be so unreliable with all the people using it, but here we are…

Figured it out. It was a doozy. It was hard to reproduce. Even with 100 calls I couldn’t reproduce, when the calls were in series.

Had to run the requests in Threads. Once did that was able to figure it out.

Details:

Essentially, the event helper methods were only grabbing the first item. Deprecated those helper methods and you should use the sqs_event_payloads helper to not miss records in the case of very fast events.

I also ran into CloudFormation rate limit throttling. So Jets is caching the lookup call now. That resolves that also.

Now, with the changes, tested with 1000 threads and putting records into both DynamoDB and MySQL. Both database tables have exactly 1000 records after processing.

Was still slightly bothered the interface. Made them prettier in

They’re now called:

s3_events
sqs_events
sns_events

Note left the other methods sqs_event_payloads aliased for now.

Great work @tung!
I still have an issue thoguh, and it’s when I put sleep(0.5) in the process_message method. I’m not sure if it’s something with my setup or still an issue with slower tasks, but I lose most of them.

Unsure. Tested with a sleep 0.5 with 1,000 threads and see all 1,000 messages processed.

Here’s also a video debugging.

All 1,000 messages are processed and there are 1,000 database records in the video.

1 Like

The repair is perfect and the issue was on my end. I simplified a bit and relied on logs, but they are not reliable.
This looks great and thanks for your effort and responsiveness!

1 Like