Simple log-parsing with Python 3.5

2016-02-20 15:36

One of the coolest features for me in Python 3.5 is os.scandir - iterating over the contents of a directory. Of course you can use os.listdir in previous versions but this does neither work as a generator nor are the returned entries caching information from stat-calls.

For trying out the new feature I decided to create a simple log-parsing pipeline to find any failures in logs.

#! /usr/bin/env python3

import argparse
import os


def check_folder(folder):
    logs = get_logs(folder)
    failures = collect_failures(logs)
    print_failures(failures)


def get_logs(folder):
    for entry in os.scandir(folder):
        if entry.is_dir():
            yield from get_logs(entry.path)
        elif entry.is_file() and not entry.is_symlink():
            yield entry.path


def collect_failures(paths):
    for filedescriptor in get_file_descriptors(paths):
        yield from get_failure_lines(filedescriptor)


def get_file_descriptors(paths):
    for path in paths:
        if not path.endswith('.log'):
            continue

        with open(path, 'r') as fd:
            yield fd


def get_failure_lines(filedescriptor):
    for linenumber, line in enumerate(filedescriptor):
        if 'failed' in line.lower():
            yield filedescriptor.name, linenumber, line


def print_failures(failures):
    for filename, linenumber, line in failures:
        print('{}, line {}: {}'.format(filename, linenumber, line.strip()))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('folder')

    args = parser.parse_args()

    check_folder(args.folder)

This is very simple but has the advantage to be easily extendable. Want to support gzip-compressed files or some obscure binary log format? Just return the corresponding file descriptor. Looking for different messages? Just extend the function to yield the findings.

The functions on their own are easy to test because they are not very tight-coupled.

Tags: