Node.jsでRedisをフル活用してみた!

noderedis 

こんにちは! エンジニアインターンの斉藤です。

インターン生二人で行ったプロジェクトの中で、Redisにじっくり触れる機会があったので、そこで得られた知見を記事にしました。

  1. Redisとは?
  2. 前提
  3. 課題1: データ構造
  4. 課題2: データの整合性
  5. 課題3: Pub/Sub
  6. まとめ

Redisとは?

RedisはNoSQL型のインメモリーデータシステムで、キーバリューストアとして動作します。データベースにRedisを採用するメリットとしては主に以下の3つが挙げられます。

・高速でアクセスが可能

メモリ上で動作するデータベース、かつ非リレーショナルデータベースである、という特徴から、高速でデータを読み書きすることができます。

・データの永続的な保持が可能

メモリのデータは揮発性なので、時間が経つとデータは消えてしまいますが、メモリ上のデータをストレージに格納してデータを永続的に保持する機能があります。

データをスナップショットとして保存するRDBとコマンドを記録するAOFによってこれを実現しています。

・複雑なデータ型を保存できる

Redisは単純なKey-Valueだけでなく、さまざまなデータ型が用意されています。これによってRedisで管理できるデータの幅が広がります。

前提

今回、複数のSERP(Search Engine Result Page)分析APIサービスを1つにまとめて利用するためのREST APIを開発しました。

これによって、社内のSERP分析ツールは、単一のインターフェースで複数のSERP分析APIサービスを利用できます。

言語はNode.js、WebアプリケーションフレームワークはExpress.jsを使用しました。

複数のSERP分析APIを扱うにあたって、
  ・APIキーをDBで管理する
  ・レスポンスが早いAPIを優先的に利用する
  ・残りコール可能回数が少ないAPIの利用はできるだけ避ける
という条件があったため、Redisでは
  ・各APIの各アカウントのAPIキー
  ・各アカウントの残りコール可能回数
  ・各APIのレスポンスタイム移動平均
の3つを保存する必要がありました。

今回のプロジェクト詳細についてはこちらの記事をご覧ください。

課題1: データ構造

まず最初に躓いた点は、Redisのデータ構造の設計です。「前提」に示した3つのデータを格納するために、始めは以下のようなデータ構造を考えました。

key field value
serpA-1 apikey xxx0000...
rmcnt 53252
service serpA
serpA-2 apikey xxx1111...
rmcnt 90002
service serpA
serpB-1 apikey xxx2222...
rmcnt 19213
service serpB



key value
resTime-serpA 502
resTime-serpB 334


hash型にAPIキー、残り利用回数、SERPサービス名を格納し、文字列型に各SERPサービスのレスポンスタイムの移動平均を格納する、という考え方です。

しかし、このデータ構造にはある問題点があります。

・データを取り出すのに複数回の操作が必要になる

上に書いた情報を全て取り出す場合、keys * で全てのキーを取得したしたあと、keyから1つずつデータを取り出す必要があるので、1回で全てのデータを取り出すことができません。

さらにkeyの数が多くなるとRedisへのアクセス回数も増えてしまいます。

また、アカウントに「serpA-1」などと固有の名前を付けていますが、APIキーが一意なので、これも不要だと言えます。

この問題を解決したデータ構造が下の表のようになります。

key field value
config rmcnt:serpA:xxxx0000... 53252
rmcnt:serpA:xxxx1111... 90002
rmcnt:serpB:xxx2222... 19213
avgrt:serpA 502
avgrt:serpB 334


hash型と文字列型を混在させることをやめ、全てhash型にまとめ、keyをconfigとしました。

さらに、fieldはコロン区切りにすることで、複数の情報を入れられるようにしました。

これによって、hgetall configという操作1発で全データを取り出せるようになりました。

注)
rmcnt: remaining countsの略
avgrt: average response time の略

課題2: データの整合性

SERPのサービスを利用後、残り利用回数を減らすコードについて、当初は次のように書いていました。

// redis.js

const redis = require('redis')
const client = redis.createClient({url:process.env.REDIS_URL})

// hash型を追加・更新
module.exports.setHash = async function (key, field, value) {
    await client.HSET(key, field, value)
}

// app.js

const redis = require('redis.js')


// 割り当てたSERPサービスの情報を格納
let serp_assigned = await getAssignedSerpInfo()

// SERPサービスから情報を取得
const serp_result = await getSerpResult(serp_assigned)

// 残り回数を減らす
const new_rmcnt = serp_assigned.rmcnt - 1
await redis.setHash('config', `rmcnt:${serp_assigned.service}:${serp_assigned.apiKey}`, new_rmcnt)


getAssignedSerpInfo()
でRedis経由で残り回数を含めた情報を取得したあと、getSerpResult()でSERPサービスから情報を取得し、最後にsetHash()残り回数を更新しています。

今回開発したAPIはECSでデプロイしたのですが、複数のノードでクラスターを構成しています。

そのため、読み込みから書き込みまでの間に他のノードが読み込んでしまうとデータがずれてしまいます。

データずれ説明


getAssignedSerpInfo()
からsetHash()の間までトランザクションを敷けばデータがずれることはありませんが、他のノードに待ち時間が発生し、サービスのパフォーマンスが落ちてしまいます。

これを解決するのが、Redisが提供しているHINCRBYというコマンドです。

現在のhash型のvalueから指定した整数分だけ数を加算することができます。

// redis.js

const redis = require('redis')
const client = redis.createClient({url:process.env.REDIS_URL})

// ハッシュ型のValueを増減
module.exports.hIncrBy = async function (key, field, num) {
    await client.hIncrBy(key, field, num)
}

// app.js

const redis = require('redis.js')


// 割り当てたSERPサービスの情報を格納
let serp_assigned = await getAssignedSerpInfo()

// SERPサービスから情報を取得
const serp_result = await getSerpResult(serp_assigned)

// 残り回数を減らす
await redis.hIncrBy('config', `rmcnt:${serp_assigned.service}:${serp_assigned.apiKey}`, -1)


これで、ノードが保持している残り利用回数に影響されることなくRedisのデータを減算でき、データにずれが生じることはなくなりました。

課題3: Pub/Sub

「課題2」で、ECS上にデプロイし複数のノードでクラスターを構成している、と述べましたが、社員さんから「各ノードの内部IPアドレスと処理したリクエスト数、サーバー稼働時間を返すAPIを実装してほしい」という要望をいただきました。

RedisのPub/Sub機能を使って実装したので、内容を紹介します。

// redis.js

const redis = require('redis')
const client = redis.createClient({url:process.env.REDIS_URL})

//redisとのコネクションを確立
module.exports.openRedis = async function () {
    await client.connect()
}

// server-statusチャネルの購読
module.exports.subscribeServerStatus = async function () {
    const subscriber = client.duplicate()
    await subscriber.connect()
    await subscriber.subscribe('server-status', (message) => {
        if(message === "status-request") {
            this.publish('server-status', JSON.stringify({'numRequest':`${request_count}`, 'upTimeSeconds' : `${Math.floor((Date.now() - server_start_time)/1000)}`, 'privateIP':local_ip}))
        } else {
            server_status_str.push(message)
            status_res_counter.count()
        }
    })
}

// メッセージを発信
module.exports.publish = async function (channel, message) {
    const num_subscriber = await client.publish(channel, message)
    return num_subscriber
}

// app.js

const express = require('express')
const asyncCounter = require('async-counter')
const os = require('os')
const app = express()
const redis = require('redis.js')

async function initServer() {
  await redis.openRedis()
  await redis.subscribeServerStatus()
  request_count = 0
  server_start_time = Date.now()
  local_ip = getLocalAddress()
  console.log('server initialized')
}

new Promise(async (resolve) => {
  await (async function() {
    initServer()
    resolve()
  })()
}).then(() => app.listen(80))

app.get('/search', async (req, res) => {
  request_count += 1
  //略
  res.json(serp_result)
})

app.get('/status', async (req, res) => {
  server_status_str = []
  const num_server = await redis.publish('server-status', "status-request")
  status_res_counter = asyncCounter(num_server)
  await status_res_counter
  const server_status = server_status_str.map((server) => {
    let status_parsed = JSON.parse(server)
    status_parsed["upTimeSeconds"] = Number(status_parsed["upTimeSeconds"])
    status_parsed["num_request"] = Number(status_parsed["num_request"])
    return status_parsed
  })
  res.json({"server": server_status, "account": account_status})
})

// 内部IPを取得
function getLocalAddress() {
  var ifacesObj
  var interfaces = os.networkInterfaces()
  for (var dev in interfaces) {
    interfaces[dev].forEach(function(details){
        if (!details.internal){
          ifacesObj = details.address
        }
    })
  }
  return ifacesObj
}


サーバー起動時にserver-statusチャネルへ購読を行い、/statusが呼ばれると、Redisを介して各ノードへ"status-request"というメッセージを送信します。

それを受け取ったノードはリクエスト処理数、稼働時間、内部IPアドレスを同じチャネルへ流します。

この機能を実装する上で最も頭を悩ませたのは、全てのノードからメッセージが返ってくるまで処理を止めなければならない、ということです。

オートスケーリングによりノードの数は変化し得ますが、チャネルへのサブスクライバー数はpublish()の戻り値として簡単に得られます。

しかし、そこで得られた稼働中のノード数のメッセージを受け取るまでは処理を止めなければなりませんでした。

ここで役に立ったのはasync-counterというパッケージです。status_res_counter = asyncCounter(num_server)で待ちたいカウント数を登録し、 await status_res_counterで処理を止めます。

そして、メッセージを受け取ったらstatus_res_counter.count()でカウントをし、登録したカウント数に達すると処理が再開されます。

これによって、ノードの数がいくつになろうと、全ノードの情報が得られるまで処理を待てるようになりました。

まとめ

最後に今回得られた学びについてまとめます。

・「自由度が高い」という罠


Redisはリレーショナルデータベースのようにスキーマを設定する必要がなく、データ構造の設計はかなり開発者側に委ねられています。

一見、これは楽に思えるかもしれませんが、設計を慎重に行わないと、知らぬ間に多量のアクセスを行い、コストが膨大になったり、パフォーマンスを大きく低下させたりすることに繋がりかねません。

開発ができるだけでなく、コストやパフォーマンスも配慮して開発をできるようになる良いきっかけとなりました。

・提供されている機能は使い倒すべき


自己解決するのに越したことはないですが、「課題2」におけるHINCRBYのように、開発者が抱える問題を解決してくれる機能はしばしばあるものだなと実感しました。

始めからそのような機能があることを知っていればもっと早く完成まで進められたのではないかと思いました。

そのためにも、新しい技術に触れる際は公式ドキュメントを読み込むことを引き続き徹底し、開発を始める前の設計に反映させることを意識したいです。

これからRedisを新しく使い始める方や、導入例を模索している方の参考に少しでもなれば幸いです。

最後まで読んでいただき、ありがとうございました!

※2022年10月27日時点

Techブログ 新着記事一覧