チュートリアル - LogReaderコンポーネントの作成

  1. 概要
  2. LogAPIについて
  3. JavaInterpreterコンポーネントによるログの読込み
  4. ウィザードの実行
  5. カスタムコンポーネントの作成
    1. 生成物の確認
    2. buildの実行
    3. jarファイルのインストール
    4. 動作確認
  6. LogReaderコンポーネントの改良
    1. フィールド定義の固定
    2. ループ対応
      1. デザイナー側 - 定義ファイルの修正
      2. サーバー側 - ソースコードの修正
    3. その他の改良

1 概要

コンポーネント作成のチュートリアルとして、ここではフローサービスのログファイルを読み込むコンポーネントを作成してみたいと思います。

2 LogAPIについて

フローサービスのログはLogReaderというクラスで読み込むことができます。
これはJavaのBufferedReaderクラスのようにストリームを読み込むクラスで、BufferedReaderが1行分の文字列を読んでStringを返すのに対し LogReaderは1行分のログを読み込んでLogItemというオブジェクトを返します。
LogItemは1行分のログをラップしたクラスです。(ひとつのログが複数行にまたがる場合でもLogItemは正確にログを構造化できます。)

LogReaderの典型的な使い方は次のようになります。

File logFile;
LogReader reader = new LogReader(logFile);
try {
    LogItem item = reader.read();
    while (item != null) {
        //Do something
        item = reader.read();
    }
} finally {
    reader.close();
}

3 JavaInterpreterコンポーネントによるログの読込み

まずはJavaInterpreterコンポーネントを使ってログをレコード化したストリームを出力してみます。
「Start > JavaInterpreter > EndResponse」というフローを作成して、以下のコードをJavaInterprterのソースとして作成してください。

import com.infoteria.asteria.flowlibrary2.stream.StreamFactoryRecord;
import com.infoteria.asteria.log.parser.LogItem;
import com.infoteria.asteria.log.parser.LogReader;
import com.infoteria.asteria.value.Value;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;

ArrayList list = new ArrayList();
SimpleDateFormat format = new SimpleDateFormat(LogItem.getDateFormat());

LogReader reader = new LogReader(new File(component.getParameter("FilePath").strValue()));
try {
    LogItem item = reader.read();
    while (item != null) {
        Value[] values = new Value[6];
        values[0] = new Value(item.getLineNumber());//行番号
        values[1] = new Value(format.parse(item.getDateStr()));//日時
        values[2] = new Value(item.getLevel());//ログレベル
        values[3] = new Value(item.getCategory());//カテゴリー名
        values[4] = new Value(item.getThreadName());//スレッド名
        values[5] = new Value(item.getMessage());//メッセージ
        
        list.add(values);
        item = reader.read();
    }
} finally {
    reader.close();
}
StreamFactoryRecord factory = (StreamFactoryRecord)component.getOutputConnector().getStreamFactory();
return factory.create(list);

上のコードでは「FilePath」というパラメーターからログファイル名を取得し、そこからログを読み込んでRecordストリームを作成しています。
つまりこのコードを実行するためにはさらにJavaInterpreterコンポーネントの設定として、

する必要があります。
フィールド定義についてはストリームペインの右クリックメニューから「CSV形式で編集」を選択して、以下の定義を貼り付けてください。

LineNo,Integer
Date,DateTime
LogLevel,String
Category,String
Thread,String
Message,String

ここまででJavaInterpreter自体の設定は完了です。
実行するためにFilePathパラメーターに適当なログファイル名を設定して実行してみてください。

注意
ここで指定するログファイルはできるだけサイズの小さなものにしてください。
JavaInterpreterの実行速度は速くないですし、文法チェックのためにコンパイル時にも一度実行されるのでサイズの大きなファイルを指定するとコンパイルにも時間がかかることになります。

4 ウィザードの実行

動作が確認できたらJavaInterpreterの右クリックメニュー「カスタムコンポーネントの作成」からこのコンポーネントをカスタムコンポーネント化してみます。

基本設定(1ページ目)
コンポーネント名 LogReader
表示名 (なし)
アイコン logreader.png(SDKのsampleフォルダにあります。)
初期表示タブ SDKテスト
ツールチップ フローサービスのログを読み込みます。

名前やツールチップは自由に付けていただいて構いません。
またあとから定義ファイルやソースコードを直接変更することもできます。

Javaクラス設定(2ページ目)
クラス名 LogReaderComponent
パッケージ名 com.infoteria.sample.component
保存先フォルダ (任意のフォルダ)

サンプルのjarファイル内には上記のパッケージ名とクラス名が使用されていますが、 自分で作成される場合はJavaのパッケージ名とクラス名は変更した方が良いでしょう。

入力ストリーム設定(3ページ目)
受け入れ可能なストリーム数 -1
(他は省略)

このコンポーネントは入力ストリームを使わないのでこのページはすべて初期値のまま次のページに進んでも良いんですが、 入力ストリームを使わないコンポーネントでは入力数を無制限にしておくのがセオリーです。
(FileGetコンポーネントやRDBGetコンポーネントも入力側では複数のリンクを接続できるようになっています。)

出力ストリーム設定(4ページ目)
出力可能なストリーム Record
出力ストリームの初期値 Record

JavaInterpreterの出力ストリーム定義がレコードになっているので、この画面では最初からRecordストリームのみが選択されています。
このままの状態で次のページに進みます。

プロパティ設定(5ページ目)
プロパティ名 FilePath
表示名 ログファイル名
string
エディタ (なし)
デフォルト値 (なし)
必須 true
ツールチップ ログファイル名を指定します。
マッピング both

JavaInterpreterのパラメーターの設定が画面に反映されています。
デフォルト値にはテストで使用したログのファイル名が設定されていると思うのでそれは削除してください。
表示名には日本語でのプロパティ名を設定し、必須項目なので必須の列をtrueに変更します。
ツールチップにはプロパティの説明を適当に入力してください。

「ダブルクリック時にプロパティエディタを起動する」のチェックはつけてもつけなくても良いです。
プロパティ値の設定にファイル選択ダイアログ(remoteFile型プロパティ)が使えると便利だったんですが、ログファイルは フローサービスのユーザーフォルダー以下にあるものではないので、単純なstring型のプロパティとしています。
そのためコンポーネントのダブルクリックでエディタが起動してもそれほど便利ではないかもしれません。

ソース設定(6ページ目)
(省略)

JavaInterpreterで設定したソースコードが画面に反映されています。
特にソースの変更は必要ありませんが、必要なimport文はすべて記述してあるので「高度な設定」の「BeanShellのデフォルトのimport文を含める」のチェックは外しておきましょう。

ビルド設定(7ページ目)
build.xmlを生成する チェック
jarファイル名 logreader.jar
バージョン番号 1.0
すぐにビルドを実行 オフ
ASTERIA WARPサーバーのパス [INSTALL_DIR]を指定

マシンにANTがインストールされているなら即時実行することもできますが、ここではあとからコマンドラインでビルドすることにします。
同一マシン上にASTERIA WARPサーバーがインストールされているのであればここでそのパスを指定しておくとANTのbuild.xmlに作成したコンポーネントjarファイルの コピータスクとフローサービスの再起動タスクが組み込まれるので今後の開発サイクルが多少楽になります。

5 カスタムコンポーネントの作成

5.1 生成物の確認

ここまでの手順の実行が終わったらまずはウィザード2ページ目で指定したフォルダで生成物を確認してみます。
そこには次のファイルがあります。

これらのファイルがカスタムコンポーネントを作成/実行するために必要なファイル一式となります。
(ただしコンポーネントのヘルプファイルは含まれていないのでデザイナーからヘルプを表示することはできません。)

ついでにjavaファイルとxscファイルの中身も確認して置きます。

Javaファイル

package com.infoteria.sample.component;

import org.w3c.dom.Document;

import com.infoteria.asteria.flowengine2.execute.ExecuteContext;
import com.infoteria.asteria.flowengine2.flow.InputConnector;
import com.infoteria.asteria.flowengine2.flow.OutputConnector;
import com.infoteria.asteria.flowlibrary2.FlowException;
import com.infoteria.asteria.flowlibrary2.component.ComponentException;
import com.infoteria.asteria.flowlibrary2.component.SimpleComponent;
import com.infoteria.asteria.flowlibrary2.property.*;
import com.infoteria.asteria.flowlibrary2.stream.StreamDataObject;
import com.infoteria.asteria.flowlibrary2.stream.StreamException;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactory;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactoryText;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactoryXML;
import com.infoteria.asteria.flowlibrary2.stream.StreamType;
import com.infoteria.asteria.value.Value;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactoryRecord;
import com.infoteria.asteria.log.parser.LogItem;
import com.infoteria.asteria.log.parser.LogReader;
import com.infoteria.asteria.value.Value;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;

public class LogReaderComponent extends SimpleComponent
{
    public static final String COMPONENT_NAME = "LogReader";
    public String getComponentName() { return COMPONENT_NAME;}

    private StringProperty _propFilePath = new StringProperty("FilePath", true, true);

    public LogReaderComponent()
    {
        int ia = StreamType.ALL;
        int oa = StreamType.RECORDS;
        getInputConnector().setAcceptType(ia);
        getInputConnector().setAcceptLinkCount(-1);
        getOutputConnector().setAcceptType(oa);

        registProperty(_propFilePath);
    }

    public boolean execute(ExecuteContext context) throws FlowException {
        Object result = doExecute(context, this);
        StreamFactory factory = getOutputConnector().getStreamFactory();
        StreamDataObject os = null;
        if (result == null)
            os = factory.createEmptyStream();
        else if (result instanceof byte[])
            os = factory.create((byte[])result);
        else if (result instanceof String) {
            if (!(factory instanceof StreamFactoryText))
                throw new ComponentException("Invalid return value: " + result);
            os = ((StreamFactoryText)factory).create((String)result);
        } else if (result instanceof StreamDataObject) {
            os = (StreamDataObject)result;
            if (factory.getType() != os.getType())
                throw new ComponentException("Invalid return value: " + result);
        } else if (result instanceof Document) {
            if (!(factory instanceof StreamFactoryXML))
                throw new ComponentException("Invalid return value: " + result);
            os = ((StreamFactoryXML)factory).create((Document)result);
        } else
            throw new ComponentException("Invalid return value: " + result);
        setOutputStream(os);
        return true;
    }
    
    private Object doExecute(ExecuteContext context, LogReaderComponent component) throws FlowException {
        try {
            ArrayList list = new ArrayList();
            SimpleDateFormat format = new SimpleDateFormat(LogItem.getDateFormat());
            
            LogReader reader = new LogReader(new File(component.getParameter("FilePath").strValue()));
            try {
                LogItem item = reader.read();
                while (item != null) {
                    Value[] values = new Value[6];
                    values[0] = new Value(item.getLineNumber());
                    values[1] = new Value(format.parse(item.getDateStr()));
                    values[2] = new Value(item.getLevel());
                    values[3] = new Value(item.getCategory());
                    values[4] = new Value(item.getThreadName());
                    values[5] = new Value(item.getMessage());
                    
                    list.add(values);
                    item = reader.read();
                }
            } finally {
                reader.close();
            }
            StreamFactoryRecord factory = (StreamFactoryRecord)component.getOutputConnector().getStreamFactory();
            return factory.create(list);
        } catch (Exception e) {
            throw new ComponentException(e);
        }
    }
    
    // Utility Method
    public Value getParameter(String name)
    {
        Property prop = getProperty(name);
        if (prop instanceof ValueProperty)
            return ((ValueProperty)prop).getValue();
        else
            return null;
    }
    

    public InputConnector getInputConnector()
    {
        return super.getInputConnector();
    }

    public OutputConnector getOutputConnector()
    {
        return super.getOutputConnector();
    }

    public void setOutputStream(StreamDataObject stream)
        throws StreamException
    {
        super.setOutputStream(stream);
    }

    public void setOutputStream(StreamDataObject stream, boolean isSetProperties)
        throws StreamException
    {
        super.setOutputStream(stream, isSetProperties);
    }
}

ある程度Javaに詳しい人ならわかると思いますが、このコードはいくらか冗長です。
doExecuteメソッド内部ではStreamFactoryRecord#createの返り値(=StreamDataObject)を返しているのに、 呼び出し側では返り値の型チェックをして分岐しています。

またコードの後半には使用していない不要なメソッド(Componentクラスのprotectedメソッドをpublicに公開した だけのメソッド)が存在しています。

これらはJavaInterpreterで記述したコードを可能な限りそのまま使うための処置なので不要な部分は全部削って しまって良いんですが、とりあえずはこのまま置いておきます。
(このコードはこのままコンパイルできます。)

xscファイル


<?xml version="1.0" encoding="utf-8"?>
<ComponentDefine>
    <xsc xmlns="http://www.infoteria.com/asteria/flowengine/definition">
        <Component category="SDKテスト" icon="logreader.png" name="LogReader"  toolTip="フローサービスのログを読み込みます">
            <Class>com.infoteria.sample.component.LogReaderComponent</Class>
            <Property mapping="false" name="Exception" type="exception"/>
            
            <Property name="FilePath"  displayName="ログファイル名"  type="string"    tooltip="ログファイルを指定します。"  required="true"  ></Property>
            <Input accept="Binary;Text;HTML;ParameterList;Record;CSV;FixedLength;MIME;XML" default="Binary"  count="unbounded"  />
            <Output  accept="Record" default="Record"  />
        </Component>
    </xsc>
    <xsc lang="ja" xmlns="http://www.infoteria.com/asteria/flowengine/definition">
        <Component category="SDKテスト" icon="logreader.png" name="LogReader"  toolTip="フローサービスのログを読み込みます">
            <Class>com.infoteria.sample.component.LogReaderComponent</Class>
            <Property mapping="false" name="Exception" type="exception"/>
            
            <Property name="FilePath"  displayName="ログファイル名"  type="string"    tooltip="ログファイルを指定します。"  required="true"  ></Property>
            <Input accept="Binary;Text;HTML;ParameterList;Record;CSV;FixedLength;MIME;XML" default="Binary"  count="unbounded"  />
            <Output  accept="Record" default="Record"  />
        </Component>
    </xsc>
</ComponentDefine>

一つ目のxsc要素が英語版の定義で、二つ目の「lang="ja"」がついているxsc要素が日本語版の定義です。

ウィザードでは各項目を日本語版用、英語版用にわけて入力させてはいないのでどちらにも同じ内容が反映されています。
通常は日本語版としてウィザードを実行すると思うので、英語版の方は必要に応じて日本語部分を修正してください。

5.2 buildの実行

生成物の確認が終わったら早速ビルドを実行してみます。
コマンドプロンプトでbuild.xmlのあるフォルダまで移動して、

ant dist

を実行します。
ここまでの手順に誤りがなければビルドに成功するはずです。
(Javacに警告が出力されますがこれはあまり気にする必要がありません。これはJDK8.0でArrayListを型チェックなしで使用しているからで、 気になるようならbuild.xmlのjavac要素に「source="1.4"」をつけるかソースコードを修正してください。)

ビルドに成功した場合distフォルダにjarファイルができています。

5.3 jarファイルのインストール

作成したカスタムコンポーネントをフローサービスで実行するためにはjarファイルを次の2箇所にコピーします。

サーバー、デザイナー共にコピー後には再起動が必要です。

ASTERIA WARPサーバーが同一マシン上にあり、ウィザード7ページ目でそのパスを指定しているならばjarファイルのコピーと フローサービスの再起動もANTタスクとして行えます。

ant cp
ant restart

5.4 動作確認

jarファイルのコピーとサーバー再起動が終わったらデザイナーをたちあげて、作成したカスタムコンポーネントをテストしてみます。
jarファイルのコピー後に最初にデザイナーを起動するとダイアログで新しいコンポーネントが追加されたことが通知されます。

ここで「Start > LogReader > EndResponse」のような簡単なフローで動作確認してみてください。
この時に注意が必要なのはLogReaderコンポーネントでストリームのフィールド定義が必要な点です。
ここではとりあえずJavaInterpreterのテストで使用したフィールド定義をコピーしてきて「CSV形式で編集」で貼り付けてください。

現時点ではソースコード側では出力ストリームは6列のRecordストリーム決めうちになっていますが、 定義ファイル側では「出力ストリームはRecord型」となっているだけなので、フィールド定義はフローを作成する人が自分で行わなければなりません。
このような場合フィールド定義を定義ファイルで固定することもできるので、それは次のステップで対応します。

実行の結果はJavaInterpreterでテストした時と同じになりますが、JavaInterpreterよりもかなり高速に動作することが確認できたと思います。
インタープリタ形式のJavaInterpreterでは実行ステップ数が増えるとリニアにパフォーマンスに跳ね返ってくるので、ファイルサイズが大きくなればなるほど パフォーマンス差は拡大します。

ここまでで一通りのカスタムコンポーネント開発手順の説明は完了です。
あとはひたすら「ソースコード(定義ファイル)修正 > ビルド > インストール > 動作確認」という手順を繰り返していくことになります。
次章以降ではもう少しこのコンポーネントに機能追加してみたいと思います。

6 LogReaderコンポーネントの改良

6.1 フィールド定義の固定

まずはコンポーネントのフィールド定義をコンポーネントユーザーが自分で定義する必要がないように固定してみます。
これはxscファイルのOutput要素にフィールド定義を付け加えることで実現できます。


<Output accept="Record">
    <FieldDef readonly="true">
        <Field name="LineNumber" type="Integer"/>
        <Field name="Date" type="DateTime"/>
        <Field name="LogLevel" type="String"/>
        <Field name="Category" type="String"/>
        <Field name="Thread" type="String"/>
        <Field name="Message" type="String"/>
    </FieldDef>
</Output>

詳細な説明は定義ファイルリファレンスに譲りますが、Outputの定義をこのように変更することで コンポーネントをパレットからドロップした状態で上記のフィールド定義があり、かつ変更不可な状態になります。

ちなみに修正がこのように定義ファイルの変更だけで可能な場合はテストはjarファイルをフローデザイナーにコピーするだけでも行えます。
(定義ファイルの情報は極一部を除いてサーバー側では使用されないのでほとんどの場合再起動は不要です。)

6.2 ループ対応

ログファイルは巨大になることが多いので今のように全部を読み込むことは重い処理になりますし、またOutOfMemoryの危険をはらんでいます。
そこでRDBGetコンポーネントなどと同様にLoopProcessプロパティを持たせて1行ずつループで処理するか、まとめて処理するかを選択できるようにしてみます。

6.2.1 デザイナー側 - 定義ファイルの修正

まずはサーバー側のJavaコードの実装は置いておいて定義ファイルだけを修正してデザイナーの動作を確認してみます。

ループの有無を選択するプロパティを追加する場合はloopProcess型のプロパティを使用します。
これは「はい/いいえ」の二択のプロパティ - すなわちbooleanプロパティの一種ではあるんですが、 値の変更に連動してループアイコンの状態が切り替わる所だけが通常のbooleanプロパティとは異なります。

このコンポーネントではこのプロパティのデフォルト値は「true」であることにします。
つまりループ処理をデフォルトとします。

ループ処理のデフォルトが「false」の場合はProperty要素を追加するだけで良いんですが、「true」とする場合は Output要素に「loop="true"」という属性を加える必要があります。
定義ファイルはコンポーネントの初期状態(ドロップ時点の状態)を表すものなので、これを忘れると初期状態で 「LoopProcess=trueであるにも関わらずアイコンはループになっていない」という状態になります。
(インスペクタでLoopProcessプロパティを変更すれば正しい状態に戻ります。)

以下に変更したxscファイル(日本語版部分のみ)をあげておきます。

    <xsc lang="ja" xmlns="http://www.infoteria.com/asteria/flowengine/definition">
        <Component category="SDKテスト" icon="logreader.png" name="LogReader"  toolTip="フローサービスのログを読み込みます">
            <Class>com.infoteria.sample.component.LogReaderComponent</Class>
            <Property mapping="false" name="Exception" type="exception"/>
            
            <Property name="FilePath"  displayName="ログファイル名"  type="string"    tooltip="ログファイルを指定します。"  required="true"  ></Property>
            <Property name="LoopProcess"  displayName="ループを開始"  type="loopProcess"    tooltip="ループを開始します。">true</Property>
            <Input accept="Binary;Text;HTML;ParameterList;Record;CSV;FixedLength;MIME;XML" default="Binary"  count="unbounded"  />
            <Output accept="Record" loop="true">
                <FieldDef readonly="true">
                    <Field name="LineNumber" type="Integer"/>
                    <Field name="Date" type="DateTime"/>
                    <Field name="LogLevel" type="String"/>
                    <Field name="Category" type="String"/>
                    <Field name="Thread" type="String"/>
                    <Field name="Message" type="String"/>
                </FieldDef>
            </Output>
        </Component>
    </xsc>

この状態でデザイナーを起動し、「ループを開始」のプロパティを変更するとそれに応じてループのアイコンが切り替わるのがわかると思います。
ちなみにこの状態でフローを保存(コンパイル)するとLoopProcessプロパティが見つかりませんというエラーになります。
それはもちろんサーバー側での実装がまだ行われていないためです。

6.2.2 サーバー側 - ソースコードの修正

次にサーバー側でのソースコードの修正を行います。
必要な修正の概要は次のようになります。

LoopProcessプロパティの追加
まずはコンポーネントにLoopProcessというプロパティを追加します。
デザイナー側では「loopProcess」というプロパティ型を使用しましたが、サーバー側では単なるBooleanPropertyであることに注意してください。
loopProcess型はデザイナーで通常のboolean型プロパティにループアイコンを切り替える機能を加えられただけのものなのでサーバー側では必要ありません。
LoopPossibilityメソッドの実装
ループする可能性のあるコンポーネントでは、自分がループの起点になることがあるということをコンパイラに通知する必要があります。
そのためのメソッドがloopPossibilityです。
このコンポーネントではLoopProcessプロパティの値がtrueの場合だけループする可能性があるので、LoopProcessプロパティの設定値をそのまま返します。
executeメソッドでLoopProcessを判断して分岐
LoopProcessがfalseの場合はこれまでのコードで対応できますが、trueの場合は1行ずつログを読む別のロジックを組む必要があります。
またその場合はexecuteメソッドの返り値をfalseにします。
executeLoopメソッドを実装
ループのあるコンポーネントではexecuteLoopメソッドでループ処理を実装する必要があります。

ループ処理の実装に必要な作業は以上です。
それに加えて

を行ったコードを次にあげておきます。
それぞれの処理に関してはJavaの領域になってしまうので、ここでは説明を加えませんがコンポーネント開発ガイドやJavaDocを参照すれば 処理の内容は理解できると思います。

package com.infoteria.sample.component;

import com.infoteria.asteria.flowengine2.execute.ExecuteContext;
import com.infoteria.asteria.flowengine2.execute.SystemVariables;
import com.infoteria.asteria.flowengine2.flow.InputConnector;
import com.infoteria.asteria.flowengine2.flow.OutputConnector;
import com.infoteria.asteria.flowlibrary2.component.ComponentException;
import com.infoteria.asteria.flowlibrary2.component.SimpleComponent;
import com.infoteria.asteria.flowlibrary2.FlowException;
import com.infoteria.asteria.flowlibrary2.property.BooleanProperty;
import com.infoteria.asteria.flowlibrary2.property.StringProperty;
import com.infoteria.asteria.flowlibrary2.stream.StreamDataObject;
import com.infoteria.asteria.flowlibrary2.stream.StreamException;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactory;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactoryRecord;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactoryText;
import com.infoteria.asteria.flowlibrary2.stream.StreamFactoryXML;
import com.infoteria.asteria.flowlibrary2.stream.StreamType;
import com.infoteria.asteria.log.parser.LogItem;
import com.infoteria.asteria.log.parser.LogReader;
import com.infoteria.asteria.value.Value;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import org.w3c.dom.Document;

public class LogReaderComponent extends SimpleComponent
{
    public static final String COMPONENT_NAME = "LogReader";
    public String getComponentName() { return COMPONENT_NAME;}

    private StringProperty _propFilePath = new StringProperty("FilePath", true, true);
    private BooleanProperty _propLoopProcess = new BooleanProperty("LoopProcess", true, false, true);
    
    private SimpleDateFormat _format = new SimpleDateFormat(LogItem.getDateFormat());
    private LogReader _reader = null;

    public LogReaderComponent()
    {
        int ia = StreamType.ALL;
        int oa = StreamType.RECORDS;
        getInputConnector().setAcceptType(ia);
        getInputConnector().setAcceptLinkCount(-1);
        getOutputConnector().setAcceptType(oa);

        registProperty(_propFilePath);
        registProperty(_propLoopProcess);
    }
    
    public boolean loopPossibility() {
        return _propLoopProcess.booleanValue();
    }
    
    public boolean execute(ExecuteContext context) throws FlowException {
        //ログファイルの取得
        File logFile = new File(_propFilePath.strValue());
        if (!logFile.isAbsolute()) {
            SystemVariables sv = context.getSystemVariables();
            String fspath = sv.getValue(SystemVariables.ASTERIA_DIRECTORY).strValue();
            logFile = new File(fspath + File.separator + "log", _propFilePath.strValue());
        }
        
        if (!_propLoopProcess.booleanValue()) {
            //ループじゃない場合
            try {
                ArrayList list = new ArrayList();
                
                LogReader reader = new LogReader(logFile);
                try {
                    LogItem item = reader.read();
                    while (item != null) {
                        list.add(getLogValues(item));
                        item = reader.read();
                    }
                } finally {
                    reader.close();
                }
                StreamFactoryRecord factory = (StreamFactoryRecord)getOutputConnector().getStreamFactory();
                setOutputStream(factory.create(list));
            } catch (IOException e) {
                throw new ComponentException(e);
            }
            return true;
        } else {
            //ループのある場合
            boolean ret = false;
            try {
                ArrayList list = new ArrayList();
                
                _reader = new LogReader(logFile);
                LogItem item = _reader.read();
                if (item == null) {
                    ret = true;
                    _reader.close();
                    _reader = null;
                } else 
                    list.add(getLogValues(item));
                StreamFactoryRecord factory = (StreamFactoryRecord)getOutputConnector().getStreamFactory();
                setOutputStream(factory.create(list));
            } catch (IOException e) {
                if (_reader != null) {
                    try {
                        _reader.close();
                    } catch (IOException e2) {
                    }
                    _reader = null;
                }
                throw new ComponentException(e);
            }
            return ret;
        }
    }
    
    public int executeLoop(ExecuteContext context) throws FlowException {
        //_readerからログを読み出し次の行がない場合はループ処理なし(LOOP_NOTHING)とし、
        //ある場合はその行をRecordストリームにして出力
        try {
            LogItem item = _reader.read();
            if (item == null) {
                _reader.close();
                _reader = null;
                return LOOP_NOTHING;
            } else {
                ArrayList list = new ArrayList();
                list.add(getLogValues(item));
                StreamFactoryRecord factory = (StreamFactoryRecord)getOutputConnector().getStreamFactory();
                setOutputStream(factory.create(list));
                return LOOP_CONTINUE;
            }
        } catch (IOException e) {
            if (_reader != null) {
                try {
                    _reader.close();
                } catch (IOException e2) {
                }
                _reader = null;
            }
            throw new ComponentException(e);
        }
    }
    
    private Value[] getLogValues(LogItem item) {
        Value[] values = new Value[6];
        values[0] = new Value(item.getLineNumber());
        try {
            values[1] = new Value(_format.parse(item.getDateStr()));
        } catch (ParseException e) {
            values[1] = new Value(Value.TYPE_DATETIME);
        }
        values[2] = new Value(item.getLevel());
        values[3] = new Value(item.getCategory());
        values[4] = new Value(item.getThreadName());
        values[5] = new Value(item.getMessage());
        return values;
    }
}

6.3 その他の改良

これ以外にも

など色々と改良の余地はあると思います。
ここまでの内容が理解できていればそれらはそんなに難しくないと思うので是非色々と試してみてください。