ログ解析ツールを作る
ログ解析ツールを作っててノッテきてつぶやいたら、
ログ解析ツールを python で書いているときの嬉しさを誰かに共有したい、本当に簡潔に書けて便利なんだよ
@t2y 例題とかみてみたいです。
2010-11-18 22:37:54 via TwitBird to @t2y
と言われたのでオプションで指定した条件のログを抽出する簡単なサンプルを作ってみました。あくまで私はこんな感じで実装しましたが、他にもっと良いやり方があったら教えてくださいm(_ _)m
私の使い慣れている言語が Python と bash シェルスクリプトと C 言語(嘘です、慣れてません!)しかないので、この中で言えば Python しか選択肢がないですよねというのが本当のところでした(^ ^;;
先ずはログ解析のサンプルプログラムを紹介します(要: Python 2.6 以上)。
#!/usr/bin/env python # -*- coding: utf-8 -*- import codecs import optparse import os import re import sys from datetime import datetime # Globals VERSION = "0.30" BREAK_FUNCTIONS = ("_filter_date_to",) COLUMN_NUM = 6 POS_DATE = 0 POS_RETCODE = 4 POS_MSG = 5 class Null(object): def __new__(cls, *args, **kwargs): if not hasattr(cls, "_inst"): cls._inst = super(Null, cls).__new__(cls) return cls._inst def __init__(self, *args, **kwargs): pass def __call__(self, *args, **kwargs): return self def __repr__(self): return "Null()" def __nonzero__(self): return False def __getattr__(self, name): return self def __setattr__(self, name): return self def __delattr__(self, name): return self def parse_option(): usage = "usage: %prog [option] sample.log" ver = "%s %s" % ("%prog", VERSION) parser = optparse.OptionParser(usage, version=ver) parser.add_option("-f", "--from", dest="date_from", metavar="YYYYMMDDHHMM", help="filter with from date") parser.add_option("-t", "--to", dest="date_to", metavar="YYYYMMDDHHMM", help="filter with date to") parser.add_option("-q", "--query", dest="query", metavar="STRING", help="filter with string") parser.add_option("-o", "--outputfile", dest="output", metavar="OUTPUT_FILE", default="checked.log", help = "output file name") parser.add_option("-e", "--error", dest="error", action="store_true", default=False, help="filter with error log message") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="print log on standard output") opts, args = parser.parse_args() if args and os.access(args[0], os.R_OK): return opts, args else: print parser.print_help() sys.exit(0) def read_log(path): with codecs.open(path, mode="rU", encoding="utf-8") as f: for line in f: columns = line.split("\t") if len(columns) < COLUMN_NUM: continue yield columns, line.rstrip("\n") def filter_date_from(date_from): """ >>> filter_date_from("201011232132")(["2010.11.23 21:31:59"]) False >>> filter_date_from("201011232132")(["2010.11.23 21:32:00"]) True >>> filter_date_from("201011232132")(["2010.11.23 21:32:01"]) True """ _date_from = datetime.strptime(date_from, "%Y%m%d%H%M") def _filter_date_from(columns): log_date = datetime.strptime(columns[POS_DATE], "%Y.%m.%d %H:%M:%S") return _date_from <= log_date return _filter_date_from def filter_date_to(date_to): """ >>> filter_date_to("201011232132")(["2010.11.23 21:32:00"]) True >>> filter_date_to("201011232132")(["2010.11.23 21:32:59"]) True >>> filter_date_to("201011232132")(["2010.11.23 21:33:00"]) False """ _date_to = datetime.strptime("%s59" % date_to, "%Y%m%d%H%M%S") def _filter_date_to(columns): log_date = datetime.strptime(columns[POS_DATE], "%Y.%m.%d %H:%M:%S") return log_date <= _date_to return _filter_date_to def filter_errlog(columns): """ >>> filter_errlog(["date", "lv", "pg", "pid", "-1", "message"]) True >>> filter_errlog(["date", "lv", "pg", "pid", "0", "message"]) False >>> filter_errlog(["date", "lv", "pg", "pid", "1", "message"]) False """ return bool(min(0, int(columns[POS_RETCODE]))) def filter_query(query): """ >>> filter_query("fail")([0,0,0,0,0, "failed to extract"]) 'fail' >>> filter_query("(START|end) *id\((\d+)\)")([0,0,0,0,0, "start id(33)"]) 'start id(33)' >>> filter_query("detarame")([0,0,0,0,0, "normal end"]) """ r = re.compile(r"%s" % query, re.IGNORECASE) def _filter_query(columns): ret = None m = r.search(columns[POS_MSG]) if m: ret = m.group() return ret return _filter_query def get_filter_result(columns, funcs): func = funcs[0] result = func(columns) if result and funcs[1:]: return get_filter_result(columns, funcs[1:]) return result, func def get_functions(opts): funcs = [] if opts.date_from: funcs.append(filter_date_from(opts.date_from)) if opts.date_to: funcs.append(filter_date_to(opts.date_to)) if opts.error: funcs.append(filter_errlog) if opts.query: funcs.append(filter_query(opts.query)) return funcs def analyze_log(opts, args): def _print_debug(line): print line print_debug = Null() if opts.verbose: print_debug = _print_debug log_file = args[0] out_file = opts.output funcs = get_functions(opts) with codecs.open(out_file, mode="w", encoding="utf-8") as output: for columns, line in read_log(log_file): ret, func = get_filter_result(columns, funcs) if not ret and func.__name__ in BREAK_FUNCTIONS: break if ret: print_debug(line) output.write(line) def main(): opts, args = parse_option() try: analyze_log(opts, args) except Exception as err: print "something error:", err if __name__ == "__main__": main()
このログ解析ツールが対象としているログフォーマットは、以下のタブ区切りの6カラムから成ります。
時刻 [TAB] レベル [TAB] プログラム [TAB] PID [TAB] 返値 [TAB] メッセージ サンプル: 2010.11.18 16:56:20 16 prog 27222 0 start : origin(2) : [sample.pst] 2010.11.18 16:56:25 2 prog 27218 0 master: Records: 348 Deleted: 0
実行結果のサンプル。
$ python log_analyzer.py -f 201011181656 -t 201011181656 -e \ -q "\[.*\.doc\]" -o doc_only.log -v sample.log 2010.11.18 16:56:31 0 pg 10 -6 Can't extract [sample.DOC], primary [sample.pst] 2010.11.18 16:56:32 0 pg 10 -6 Can't extract [tekitou.doc], primary [sample.pst] $ cat doc_only.log 2010.11.18 16:56:31 0 pg 10 -6 Can't extract [sample.DOC], primary [sample.pst] 2010.11.18 16:56:32 0 pg 10 -6 Can't extract [tekitou.doc], primary [sample.pst]
このログ解析ツールサンプルの特徴を挙げてみます。おそらく以下の条件を満たせば、他の言語でも私は嬉しい気持ちになったんだと思います。
クロージャが実装できる
例えば、ログ日付の From をチェックする関数があります。引数で与える date_from はプログラム実行時に決まるので、ログの1行ごとに datetime 型に変換する必要はありません。そんなちょっとした最適化(?)にクロージャを使うとすっきり書けます。
def filter_date_from(date_from): _date_from = datetime.strptime(date_from, "%Y%m%d%H%M") def _filter_date_from(columns): log_date = datetime.strptime(columns[POS_DATE], "%Y.%m.%d %H:%M:%S") return _date_from <= log_date return _filter_date_from
テストが簡単に書ける
例えば、境界値テストが Python の doctest で簡単に書けます。新たなログ解析のオプションを増やしたいときは filter_xxx() を実装していけば良い作りになっていますが、関数の単体テストが容易に書ける doctest は素晴らしいです。
def filter_date_from(date_from): """ >>> filter_date_from("201011232132")(["2010.11.23 21:31:59"]) False >>> filter_date_from("201011232132")(["2010.11.23 21:32:00"]) True >>> filter_date_from("201011232132")(["2010.11.23 21:32:01"]) True """
"-m doctest" 付きで実行すればすぐにテストできます。
$ python -m doctest log_analyzer.py (何も出力されなければ全てのテストが成功) $ python -m doctest log_analyzer.py -v (-v オプションで詳細出力) ... 4 items passed all tests: 3 tests in log_analyzer.filter_date_from 3 tests in log_analyzer.filter_date_to 3 tests in log_analyzer.filter_errlog 3 tests in log_analyzer.filter_query 12 tests in 20 items. 12 passed and 0 failed. Test passed.
コマンドラインオプションの解析が簡単にできる
getopt モジュール をイメージし易いですが、もっと便利なモジュールがあります*2。
Python 2.3 以上から optparse モジュール が、2.7 以上からは argparse モジュール があります。argparse は optparse と後方互換性を維持した上で新たな nargs パラメータ(オプションが取る引数の数を扱う)を追加することができなくてモジュールを分割したようです。
... parser.add_option("-t", "--to", dest="date_to", metavar="YYYYMMDDHHMM", help="filter with date to") ...
オプションを追加するときは add_option() を追加するだけで良いのでメンテナンスも簡単ですね。
正規表現が使える
re モジュール があります。このサンプルではマッチした結果を何も使っていませんが、マッチした結果を使ってもっと複雑な解析をしたいときに有効です。
def filter_query(query): r = re.compile(r"%s" % query, re.IGNORECASE) def _filter_query(columns): ret = None m = r.search(columns[POS_MSG]) if m: ret = m.group() return ret return _filter_query
コードが長くなるのでサンプルに含めなかったのですが、汎用的にログを活用するなら全て DB 化してしまうのも1つの手段です。Python で使える データの永続化 の方法も多種多用です。私は SQL に慣れているので sqlite3 モジュール を利用しましたが、用途に応じて他の仕組みも試してみたいですね。
*1:C 言語で split 処理を関数化したらコピペの方が良いと言われた - forest book、ポインタのポインタのポインタを使うような関数は良くないと怒られました(> <)
*2:python で SMTP 認証を行ってメールを送信する - forest book、getopt しか知らなかったときに書いたオプションパーサです、optparse の方がずっと使い勝手が良いです