ログ解析ツールを作る

ログ解析ツールを作っててノッテきてつぶやいたら、


と言われたのでオプションで指定した条件のログを抽出する簡単なサンプルを作ってみました。あくまで私はこんな感じで実装しましたが、他にもっと良いやり方があったら教えてくださいm(_ _)m

私の使い慣れている言語が Pythonbash シェルスクリプトと C 言語(嘘です、慣れてません!)しかないので、この中で言えば Python しか選択肢がないですよねというのが本当のところでした(^ ^;;

先ずはログ解析のサンプルプログラムを紹介します(要: Python 2.6 以上)。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import codecs
import optparse
import os
import re
import sys

from datetime import datetime

# Globals
VERSION = "0.30"
BREAK_FUNCTIONS = ("_filter_date_to",)
COLUMN_NUM = 6
POS_DATE = 0
POS_RETCODE = 4
POS_MSG = 5

class Null(object):
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "_inst"):
            cls._inst = super(Null, cls).__new__(cls)
        return cls._inst
    def __init__(self, *args, **kwargs): pass
    def __call__(self, *args, **kwargs): return self
    def __repr__(self): return "Null()"
    def __nonzero__(self): return False
    def __getattr__(self, name): return self
    def __setattr__(self, name): return self
    def __delattr__(self, name): return self

def parse_option():
    usage = "usage: %prog [option] sample.log"
    ver = "%s %s" % ("%prog", VERSION)
    parser = optparse.OptionParser(usage, version=ver)
    parser.add_option("-f", "--from", dest="date_from",
                      metavar="YYYYMMDDHHMM", help="filter with from date")
    parser.add_option("-t", "--to", dest="date_to",
                      metavar="YYYYMMDDHHMM", help="filter with date to")
    parser.add_option("-q", "--query", dest="query",
                      metavar="STRING", help="filter with string")
    parser.add_option("-o", "--outputfile", dest="output",
                      metavar="OUTPUT_FILE", default="checked.log",
                      help = "output file name")
    parser.add_option("-e", "--error", dest="error",
                      action="store_true", default=False,
                      help="filter with error log message")
    parser.add_option("-v", "--verbose", dest="verbose",
                      action="store_true", default=False,
                      help="print log on standard output")
    
    opts, args = parser.parse_args()
    if args and os.access(args[0], os.R_OK):
        return opts, args
    else:
        print parser.print_help()
        sys.exit(0)

def read_log(path):
    with codecs.open(path, mode="rU", encoding="utf-8") as f:
        for line in f:
            columns = line.split("\t")
            if len(columns) < COLUMN_NUM:
                continue
            yield columns, line.rstrip("\n")

def filter_date_from(date_from):
    """
    >>> filter_date_from("201011232132")(["2010.11.23 21:31:59"])
    False
    >>> filter_date_from("201011232132")(["2010.11.23 21:32:00"])
    True
    >>> filter_date_from("201011232132")(["2010.11.23 21:32:01"])
    True
    """
    _date_from = datetime.strptime(date_from, "%Y%m%d%H%M")
    def _filter_date_from(columns):
        log_date = datetime.strptime(columns[POS_DATE], "%Y.%m.%d %H:%M:%S")
        return _date_from <= log_date
    return _filter_date_from

def filter_date_to(date_to):
    """
    >>> filter_date_to("201011232132")(["2010.11.23 21:32:00"])
    True
    >>> filter_date_to("201011232132")(["2010.11.23 21:32:59"])
    True
    >>> filter_date_to("201011232132")(["2010.11.23 21:33:00"])
    False
    """
    _date_to = datetime.strptime("%s59" % date_to, "%Y%m%d%H%M%S")
    def _filter_date_to(columns):
        log_date = datetime.strptime(columns[POS_DATE], "%Y.%m.%d %H:%M:%S")
        return log_date <= _date_to
    return _filter_date_to

def filter_errlog(columns):
    """
    >>> filter_errlog(["date", "lv", "pg", "pid", "-1", "message"])
    True
    >>> filter_errlog(["date", "lv", "pg", "pid", "0", "message"])
    False
    >>> filter_errlog(["date", "lv", "pg", "pid", "1", "message"])
    False
    """
    return bool(min(0, int(columns[POS_RETCODE])))

def filter_query(query):
    """
    >>> filter_query("fail")([0,0,0,0,0, "failed to extract"])
    'fail'
    >>> filter_query("(START|end) *id\((\d+)\)")([0,0,0,0,0, "start id(33)"])
    'start id(33)'
    >>> filter_query("detarame")([0,0,0,0,0, "normal end"])
    """
    r = re.compile(r"%s" % query, re.IGNORECASE)
    def _filter_query(columns):
        ret = None
        m = r.search(columns[POS_MSG])
        if m:
            ret = m.group()
        return ret
    return _filter_query

def get_filter_result(columns, funcs):
    func = funcs[0]
    result = func(columns)
    if result and funcs[1:]:
        return get_filter_result(columns, funcs[1:])
    return result, func

def get_functions(opts):
    funcs = []
    if opts.date_from:
        funcs.append(filter_date_from(opts.date_from))
    if opts.date_to:
        funcs.append(filter_date_to(opts.date_to))
    if opts.error:
        funcs.append(filter_errlog)
    if opts.query:
        funcs.append(filter_query(opts.query))
    return funcs

def analyze_log(opts, args):
    def _print_debug(line):
        print line
    
    print_debug = Null()
    if opts.verbose:
        print_debug = _print_debug
    
    log_file = args[0]
    out_file = opts.output
    funcs = get_functions(opts)

    with codecs.open(out_file, mode="w", encoding="utf-8") as output:
        for columns, line in read_log(log_file):
            ret, func = get_filter_result(columns, funcs)
            if not ret and func.__name__ in BREAK_FUNCTIONS:
                break
            if ret:
                print_debug(line)
                output.write(line)

def main():
    opts, args = parse_option()
    try:
        analyze_log(opts, args)
    except Exception as err:
        print "something error:", err

if __name__ == "__main__":
    main()

このログ解析ツールが対象としているログフォーマットは、以下のタブ区切りの6カラムから成ります。

時刻 [TAB] レベル [TAB] プログラム [TAB] PID [TAB] 返値 [TAB] メッセージ
サンプル:
2010.11.18 16:56:20 16 prog 27222 0 start : origin(2) : [sample.pst]
2010.11.18 16:56:25 2  prog 27218 0 master: Records: 348 Deleted: 0

実行結果のサンプル。

$ python log_analyzer.py -f 201011181656 -t 201011181656 -e \
                         -q "\[.*\.doc\]" -o doc_only.log -v sample.log
2010.11.18 16:56:31 0 pg 10 -6 Can't extract [sample.DOC], primary [sample.pst]
2010.11.18 16:56:32 0 pg 10 -6 Can't extract [tekitou.doc], primary [sample.pst]
$ cat doc_only.log 
2010.11.18 16:56:31 0 pg 10 -6 Can't extract [sample.DOC], primary [sample.pst]
2010.11.18 16:56:32 0 pg 10 -6 Can't extract [tekitou.doc], primary [sample.pst]

このログ解析ツールサンプルの特徴を挙げてみます。おそらく以下の条件を満たせば、他の言語でも私は嬉しい気持ちになったんだと思います。

  • 言語にある機能
    • split() 関数がある
    • クロージャが実装できる
    • テストが簡単に書ける
  • 標準ライブラリにある機能

split() 関数がある

今どきのスクリプト言語なら当たり前にありますよね。例えば C 言語には標準ライブラリに split() 関数がないのでこの処理を実装するだけでも苦労します*1

クロージャが実装できる

例えば、ログ日付の From をチェックする関数があります。引数で与える date_from はプログラム実行時に決まるので、ログの1行ごとに datetime 型に変換する必要はありません。そんなちょっとした最適化(?)にクロージャを使うとすっきり書けます。

def filter_date_from(date_from):
    _date_from = datetime.strptime(date_from, "%Y%m%d%H%M")
    def _filter_date_from(columns):
        log_date = datetime.strptime(columns[POS_DATE], "%Y.%m.%d %H:%M:%S")
        return _date_from <= log_date
    return _filter_date_from

テストが簡単に書ける

例えば、境界値テストが Python の doctest で簡単に書けます。新たなログ解析のオプションを増やしたいときは filter_xxx() を実装していけば良い作りになっていますが、関数の単体テストが容易に書ける doctest は素晴らしいです。

def filter_date_from(date_from):
    """
    >>> filter_date_from("201011232132")(["2010.11.23 21:31:59"])
    False
    >>> filter_date_from("201011232132")(["2010.11.23 21:32:00"])
    True
    >>> filter_date_from("201011232132")(["2010.11.23 21:32:01"])
    True
    """

"-m doctest" 付きで実行すればすぐにテストできます。

$ python -m doctest log_analyzer.py
(何も出力されなければ全てのテストが成功)

$ python -m doctest log_analyzer.py -v
(-v オプションで詳細出力)
...
4 items passed all tests:
   3 tests in log_analyzer.filter_date_from
   3 tests in log_analyzer.filter_date_to
   3 tests in log_analyzer.filter_errlog
   3 tests in log_analyzer.filter_query
12 tests in 20 items.
12 passed and 0 failed.
Test passed.

コマンドラインオプションの解析が簡単にできる

getopt モジュール をイメージし易いですが、もっと便利なモジュールがあります*2

Python 2.3 以上から optparse モジュール が、2.7 以上からは argparse モジュール があります。argparse は optparse と後方互換性を維持した上で新たな nargs パラメータ(オプションが取る引数の数を扱う)を追加することができなくてモジュールを分割したようです。

...
    parser.add_option("-t", "--to", dest="date_to",
                      metavar="YYYYMMDDHHMM", help="filter with date to")
...

オプションを追加するときは add_option() を追加するだけで良いのでメンテナンスも簡単ですね。

正規表現が使える

re モジュール があります。このサンプルではマッチした結果を何も使っていませんが、マッチした結果を使ってもっと複雑な解析をしたいときに有効です。

def filter_query(query):
    r = re.compile(r"%s" % query, re.IGNORECASE)
    def _filter_query(columns):
        ret = None
        m = r.search(columns[POS_MSG])
        if m:
            ret = m.group()
        return ret
    return _filter_query

コードが長くなるのでサンプルに含めなかったのですが、汎用的にログを活用するなら全て DB 化してしまうのも1つの手段です。Python で使える データの永続化 の方法も多種多用です。私は SQL に慣れているので sqlite3 モジュール を利用しましたが、用途に応じて他の仕組みも試してみたいですね。

*1:C 言語で split 処理を関数化したらコピペの方が良いと言われた - forest book、ポインタのポインタのポインタを使うような関数は良くないと怒られました(> <)

*2:python で SMTP 認証を行ってメールを送信する - forest book、getopt しか知らなかったときに書いたオプションパーサです、optparse の方がずっと使い勝手が良いです