Debug School

rakesh kumar
rakesh kumar

Posted on

How to integrate Redis queue in Laravel

Implementation Using Redis cache and Redis Queue (Hybrid Approach) to fetch data

Implementation Using Redis cache and Redis Queue (Hybrid Approach) to store

Integrate Redis queue to fetch data from database

Implementation Using Redis cache and Redis Queue (Hybrid Approach)

public function index($id)
{
    // Generate a Redis key to track processing status
    $redisKey = 'url_dashboard:' . $id;

    // Check if data is already cached
    if (Redis::exists($redisKey)) {
        // If data exists, retrieve and return it
        $cachedData = json_decode(Redis::get($redisKey), true);

        return response()->json([
            'success' => true,
            'data' => UrlResource::collection($cachedData['urls']),
            'url_count' => $cachedData['url_count'],
            'message' => 'URLs retrieved successfully.',
        ], 200);
    } else {
        // Enqueue a job to process and cache the data
        $this->enqueueDataFetchingJob($id, $redisKey);

        // Respond that data is being processed
        return response()->json([
            'success' => true,
            'data' => [],
            'url_count' => 0,
            'message' => 'Data is being processed. Please try again shortly.',
        ], 200);
    }
}

/**
 * Enqueue a job to fetch URLs and their count.
 *
 * @param int $id
 * @param string $redisKey
 */
private function enqueueDataFetchingJob($id, $redisKey)
{
    // Push a job to the Redis queue
    Redis::lpush('url_data_queue', json_encode([
        'id' => $id,
        'redisKey' => $redisKey,
    ]));
}
Enter fullscreen mode Exit fullscreen mode

Worker Method to Process the Queue
Create a background worker script to process the jobs in the Redis queue:

public function processUrlDataQueue()
{
    while ($job = Redis::rpop('url_data_queue')) {
        // Decode the job data
        $jobData = json_decode($job, true);
        $id = $jobData['id'];
        $redisKey = $jobData['redisKey'];

        // Fetch URLs and URL count from the database
        $urls = Link::where('slug_id', $id)->get();
        $urlCount = DB::table('links')->where('admin_id', $id)->count();

        // Log the count for debugging
        Log::info('URL count displayed in dashboard: ' . $urlCount);

        // Prepare data for caching
        $dataToCache = [
            'urls' => $urls->toArray(),
            'url_count' => $urlCount,
        ];

        // Store data in Redis with an expiration time (e.g., 8 hours)
        Redis::set($redisKey, json_encode($dataToCache));
        Redis::expire($redisKey, 28800);
    }
}
Enter fullscreen mode Exit fullscreen mode

Setting Up the Worker
To run the worker, you can use a simple script or integrate it into your Laravel queue system.

Example Worker Script:

#!/bin/bash

while true; do
    php artisan queue:work --queue=url_data_queue --sleep=3 --tries=3
done
Enter fullscreen mode Exit fullscreen mode

Laravel Worker Setup:
Alternatively, you can create a Laravel job and worker. Example:

php artisan make:job ProcessUrlData
Enter fullscreen mode Exit fullscreen mode

Inside the job, implement the logic to process the data and cache it in Redis. Then, queue the job using Laravel's job dispatching system.

Another Example

Redis Queue Implementation
To enqueue a task for processing the phone numbers in the background:

Updated Code: Queue Implementation

use Illuminate\Support\Facades\Redis;

public function index($id)
{
    // Define a Redis key
    $redisKey = 'phone_numbers:' . $id;

    // Check if data is already cached
    if (Redis::exists($redisKey)) {
        // Retrieve data from cache
        $cachedData = json_decode(Redis::get($redisKey), true);
        return response()->json([
            'success' => true,
            'data' => $cachedData['data'],
            'message' => $cachedData['message'],
            'count' => $cachedData['count']
        ], 200);
    }

    // Enqueue a task to fetch and cache the data
    $this->enqueuePhoneNumberTask($id, $redisKey);

    // Respond with a processing message
    return response()->json([
        'success' => true,
        'data' => [],
        'message' => 'Data is being processed. Please try again shortly.',
        'count' => 0
    ], 200);
}

/**
 * Enqueue a task to fetch and cache phone numbers.
 *
 * @param int $id
 * @param string $redisKey
 */
private function enqueuePhoneNumberTask($id, $redisKey)
{
    Redis::lpush('phone_number_queue', json_encode([
        'id' => $id,
        'redisKey' => $redisKey,
    ]));
}
Enter fullscreen mode Exit fullscreen mode

Background Worker for Processing the Queue
Create a worker script to process tasks from the Redis queue:

use Illuminate\Support\Facades\Redis;

public function processPhoneNumberQueue()
{
    while ($job = Redis::rpop('phone_number_queue')) {
        $jobData = json_decode($job, true);
        $id = $jobData['id'];
        $redisKey = $jobData['redisKey'];

        // Fetch data from the database
        $PhoneNumbers = PhoneNumber::where('slug_id', $id)->get();

        // Prepare data for caching
        $response = [
            'success' => true,
            'data' => $PhoneNumbers->toArray(),
            'message' => 'PhoneNumber retrieved successfully.',
            'count' => count($PhoneNumbers),
        ];

        // Cache the data in Redis
        Redis::set($redisKey, json_encode($response));
        Redis::expire($redisKey, 3600); // Expire in 1 hour
    }
}
Enter fullscreen mode Exit fullscreen mode

Implemention of redis Queue to store data in laravel

Existing code of laravel

 public function store(Request $request)
    {
        $input = $request->all();
        Log::info($request);
        $validator = Validator::make($input, [
            'project_name' => 'required',
            // 'fb_likes' => 'required',
            // 'yt_subs' => 'required',
            // 'tw_follower' => 'required',
            // 'insta_follower' => 'required',
            ]);

         if ($validator->fails()) {
            $response = [
                'success' => false,
                'data' => 'Validation Error.',
                'message' => $validator->errors()
            ];
            return response()->json($response, 422);
        }
        else {
            // $social_mail_check = $request->user_id;
            // if($social_mail_check ==!NULL){
                $social_rank = Socialrank::where('project_name','=',$input['project_name'])->where('admin_id','=',$input['admin_id'])->first();
                if($social_rank ===NULL){
                    Log::info('Socialrank stored successfully');
                    Log::info('if ke ander aa gaye hai');
                    $socialrank = Socialrank::create([
                            'admin_id' => $input['admin_id'],
                            'admin_email' => $input['admin_email'],
                            'user_email' => $input['user_email'],
                            'user_id' => $input['user_id'],
                            'project_id' => $input['project_id'],
                            'project_name' => $input['project_name'],
                            'fb_likes' => isset($input['fb_likes']) ? $input['fb_likes'] : null,
                            // 'yt_subs' => $input['yt_subs'],
                            'yt_subs' => isset($input['yt_subs']) ? $input['yt_subs'] : null,
                            // 'tw_follower' => $input['tw_follower'],
                            'tw_follower' => isset($input['tw_follower']) ? $input['tw_follower'] : null,
                            'insta_follower' => isset($input['insta_follower']) ? $input['insta_follower'] : null,
                            // 'insta_follower' => $input['insta_follower'],
                            'slug_id' => $input['u_org_organization_id'],
                            'slugname' => $input['u_org_slugname'],
                         ]);
                         $redisKey = 'socials_dashboard:' . $input['u_org_organization_id'];             
                         // Delete Redis cache if it exists
                         if (Redis::exists($redisKey)) {
                             Redis::del($redisKey);                   
                         }

                    $response = [
                        'success' => true,
                        'data' =>new SocialrankResource($socialrank),
                        'message' =>'Social Rank stored successfully.'
                    ];
                }

                else{
                    $response = [
                        'success' => false,
                        'data' =>'',
                        'message' => 'Social Rank Already Exist'
                    ];
                }

                return response()->json($response, 200);         

        }
    }
Enter fullscreen mode Exit fullscreen mode

After implementing with redis queue
Full Code with Redis Queue
Controller or Service Code

use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Log;

public function storeSocialRank($input)
{
    // Enqueue the social rank creation task
    Redis::lpush('social_rank_queue', json_encode($input));

    // Respond immediately to the user
    return response()->json([
        'success' => true,
        'message' => 'Social Rank creation task has been queued for processing.',
    ], 202);
}
Enter fullscreen mode Exit fullscreen mode

Worker to Process the Redis Queue
The worker processes the tasks in the social_rank_queue to safely insert data into the database and handle Redis cache.

use Illuminate\Support\Facades\Redis;
use App\Models\Socialrank;
use App\Http\Resources\SocialrankResource;

public function processSocialRankQueue()
{
    while ($job = Redis::rpop('social_rank_queue')) {
        $input = json_decode($job, true);

        try {
            // Check if the Social Rank already exists
            $social_rank = Socialrank::where('project_name', '=', $input['project_name'])
                ->where('admin_id', '=', $input['admin_id'])
                ->first();

            if ($social_rank === null) {
                Log::info('Processing Socialrank creation for project: ' . $input['project_name']);

                // Create a new Social Rank entry
                $socialrank = Socialrank::create([
                    'admin_id' => $input['admin_id'],
                    'admin_email' => $input['admin_email'],
                    'user_email' => $input['user_email'],
                    'user_id' => $input['user_id'],
                    'project_id' => $input['project_id'],
                    'project_name' => $input['project_name'],
                    'fb_likes' => $input['fb_likes'] ?? null,
                    'yt_subs' => $input['yt_subs'] ?? null,
                    'tw_follower' => $input['tw_follower'] ?? null,
                    'insta_follower' => $input['insta_follower'] ?? null,
                    'slug_id' => $input['u_org_organization_id'],
                    'slugname' => $input['u_org_slugname'],
                ]);

                Log::info('Socialrank created successfully for project: ' . $input['project_name']);

                // Remove related Redis cache if it exists
                $redisKey = 'socials_dashboard:' . $input['u_org_organization_id'];
                if (Redis::exists($redisKey)) {
                    Redis::del($redisKey);
                    Log::info('Redis cache cleared for key: ' . $redisKey);
                }

                // Log the response
                Log::info('Socialrank stored successfully:', [
                    'data' => new SocialrankResource($socialrank),
                ]);
            } else {
                Log::info('Socialrank already exists for project: ' . $input['project_name']);
            }
        } catch (\Exception $e) {
            // Handle errors
            Log::error('Error processing Socialrank queue:', ['error' => $e->getMessage()]);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Running the Worker
Set up a worker script to continuously process the Redis queue:

#!/bin/bash

while true; do
    php artisan processSocialRankQueue
    sleep 1
done
Enter fullscreen mode Exit fullscreen mode

Store data using Hybrid Approach

use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Log;

public function storeSocialRank($input)
{
    // Define a Redis key for caching
    $redisKey = 'social_rank:' . $input['project_name'] . ':' . $input['admin_id'];

    // Check if the data is already in the cache
    if (Redis::exists($redisKey)) {
        // Fetch the cached data
        $cachedData = json_decode(Redis::get($redisKey), true);

        return response()->json([
            'success' => true,
            'data' => $cachedData,
            'message' => 'Social Rank fetched from cache successfully.',
        ], 200);
    }

    // If not cached, enqueue the task to process and store the social rank
    Redis::lpush('social_rank_queue', json_encode($input));

    // Respond immediately to the user
    return response()->json([
        'success' => true,
        'message' => 'Social Rank creation task has been queued. Data will be available shortly.',
    ], 202);
}
Enter fullscreen mode Exit fullscreen mode

Worker Code
The worker dequeues the task, processes the data, stores it in the database, and updates the Redis cache.

use Illuminate\Support\Facades\Redis;
use App\Models\Socialrank;
use Illuminate\Support\Facades\Log;

public function processSocialRankQueue()
{
    while ($job = Redis::rpop('social_rank_queue')) {
        $input = json_decode($job, true);

        try {
            // Check if the Social Rank already exists in the database
            $social_rank = Socialrank::where('project_name', '=', $input['project_name'])
                ->where('admin_id', '=', $input['admin_id'])
                ->first();

            if ($social_rank === null) {
                Log::info('Processing Social Rank creation for project: ' . $input['project_name']);

                // Create a new Social Rank entry
                $socialrank = Socialrank::create([
                    'admin_id' => $input['admin_id'],
                    'admin_email' => $input['admin_email'],
                    'user_email' => $input['user_email'],
                    'user_id' => $input['user_id'],
                    'project_id' => $input['project_id'],
                    'project_name' => $input['project_name'],
                    'fb_likes' => $input['fb_likes'] ?? null,
                    'yt_subs' => $input['yt_subs'] ?? null,
                    'tw_follower' => $input['tw_follower'] ?? null,
                    'insta_follower' => $input['insta_follower'] ?? null,
                    'slug_id' => $input['u_org_organization_id'],
                    'slugname' => $input['u_org_slugname'],
                ]);

                Log::info('Social Rank created successfully for project: ' . $input['project_name']);

                // Cache the newly created social rank in Redis
                $redisKey = 'social_rank:' . $input['project_name'] . ':' . $input['admin_id'];
                Redis::set($redisKey, json_encode(new SocialrankResource($socialrank)));
                Redis::expire($redisKey, 3600); // Cache expiry: 1 hour

                Log::info('Social Rank cached successfully with key: ' . $redisKey);
            } else {
                Log::info('Social Rank already exists for project: ' . $input['project_name']);
            }
        } catch (\Exception $e) {
            Log::error('Error processing Social Rank queue: ' . $e->getMessage());
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Running the Worker
Run the worker to process tasks from the queue. This can be done via a cron job or a supervisor-managed background process.

Example Bash Script:

#!/bin/bash

while true; do
    php artisan processSocialRankQueue
    sleep 1
done
Enter fullscreen mode Exit fullscreen mode

Advantages of Redis Queue

  • Non-blocking API: The user does not have to wait for data processing.
  • Scalability: Redis queues can handle a large number of tasks.
  • Decoupled Processing: Heavy database operations are offloaded to the background .

Image description

Redis Cache:

use Illuminate\Support\Facades\Redis;

// Store in Redis cache
Redis::set('user:123', json_encode(['name' => 'John Doe', 'email' => 'john@example.com']));
Redis::expire('user:123', 3600); // Set expiration time (1 hour)
Enter fullscreen mode Exit fullscreen mode

Redis Queue:

use Illuminate\Support\Facades\Redis;

// Push task to Redis queue
Redis::lpush('email_queue', json_encode(['to' => 'john@example.com', 'subject' => 'Welcome']));

// Pop task from Redis queue
$task = Redis::rpop('email_queue');
Enter fullscreen mode Exit fullscreen mode

Top comments (0)