コンポーネント開発者ガイド

  1. コンポーネント開発について
    1. JDK・クラスパス
  2. コンポーネントの構成要素
    1. 入力コネクタセット
    2. 出力コネクタセット
    3. コンポーネントプロパティ
    4. ストリームファクトリ
  3. コンポーネントのメソッド
    1. フロー実行の流れ
    2. コンポーネントのインスタンス
    3. メソッドの種類
      1. execute
      2. executeLoop
      3. init
      4. term
      5. endFlow
  4. ExecuteContext
    1. ログ出力について
    2. フローのログのカテゴリ
    3. 変数の取得
    4. プロジェクトオーナーと実行ユーザ
    5. コネクション
    6. トランザクション
  5. SimpleComponentサブクラスの作成
    1. サンプル1
  6. コンポーネントプロパティ
    1. ValueProperty
    2. CategoryProperty
    3. ConnectionProperty
    4. サンプル2
  7. 入力ストリーム
    1. ストリームのデータアクセス
    2. Recordの使用
    3. StreamDataContainerの扱い
  8. 出力ストリームの作成
    1. ストリームプロパティとStreamFactory
    2. Binaryストリーム
    3. Textストリーム
    4. HTMLストリーム
    5. CSVストリーム
    6. FixedLengthストリーム
    7. XMLストリーム
    8. Recordストリーム
    9. ParameterListストリーム
    10. MIMEストリーム
    11. ストリームコンテナ
  9. ストリーム変数
  10. Exception
    1. メッセージ
    2. ExceptionParam
    3. ストリーム
    4. State
  11. ループ
  12. Connectionの使用
    1. サンプル3
  13. トランザクション
    1. トランザクションを処理するコンポーネントの作成方法
      1. コネクションを使用しているコンポーネントでのTransaction処理
  14. サンプル
  15. その他のトピック
    1. 拡張プロパティの作成
    2. 拡張コンパイラの作成
    3. cloneメソッドのオーバーライド

1 コンポーネント開発について

フローサービスでは独自のコンポーネントをJavaで開発し追加することが可能になっています。
コンポーネント開発を行う場合、以下の作業が必要になります。

jarファイルの作成に関しては実際にはJavaInterpreterコンポーネント付属のSDKウィザードでほぼ自動化されています。
ここではComponentクラスのソースコードの書き方に絞って説明します。
定義ファイルの作成に関しては定義ファイルリファレンスを、 jarファイルの作成とそのインストールに関してはツールガイドを参照してください。

1.1 JDK・クラスパス

コンポーネントの開発はJDK5.0以降の環境で行ってください。
また、「DESIGNER_HOME/lib」にある以下のjarファイルにクラスパスを通してください。

実際にはウィザードで生成されるANT用のbuild.xmlでは「DESIGNER_HOME/lib/**.*.jar」にクラスパスが通っています。
作成するプログラムの内容によってはさらに別のjarファイルがコンパイル/実行に必要になる場合があるので、通常はbuild.xmlの設定を変更する必要はありません。

2 コンポーネントの構成要素

実装面から言えばコンポーネントとは「com.infoteria.asteria.flowengine2.flow.Component」のサブクラスのことです。
このクラスの各種メソッドをオーバーライドすることでコンポーネントは作成されます。
コンポーネントは大きくは以下の要素で構成されます。

2.1 入力コネクタセット

入力コネクタは入力ストリームの受け口です。
コンポーネントは必ず一つの入力コネクタセットを持ちます。
入力コネクタセットには一つのデフォルトコネクタと複数のサブコネクタを定義することができます。
入力コネクタはストリームを一つだけ受け入れるように定義することもできますし、複数のストリームを受け入れるようにすることもできます。
また受け入れ可能なストリームフォーマットを制限することもできます。

コンポーネントは入力コネクタにストリームがセットされてはじめて実行可能な状態になるので、コンポーネント実行時には少なくとも一つ以上の ストリームがデフォルトコネクタにセットされています。

実装的には入力コネクタセットはComponentEntranceに、
入力コネクタはInputConnectorに対応します。

現在はSDKでは入力サブコネクタを持つコンポーネントの開発はサポートされていません。
入力コネクタとしてはデフォルトコネクタのみが使用可能です。
サブコネクタがサポートされない現在ユーザが直接ComponentEntranceを操作することはありません。

2.2 出力コネクタセット

出力コネクタはストリームの出口です。
コンポーネントは一つ以上の出力コネクタセットを持ちます。
出力コネクタセットが複数ある場合は、それぞれの出力コネクタセットはブランチによる分岐に対応します。
つまり出力コネクタセットが複数ある場合でも実際にストリームの出口として使用される出力コネクタセットは一つだけです。

出力コネクタセットには一つのデフォルトコネクタと複数のサブコネクタを定義することができます。
コンポーネントはそのexecute(またはexecuteLoop)メソッドの中で必ず使用する出力コネクタセットのデフォルトコネクタにストリームをセットしなければなりません。
サブコネクタはセットされた場合でも、そこへの出力ストリームのセットは必須ではありません。
コンポーネントはサブコネクタに他のコンポーネントがリンクされていない場合でも問題なく動作します。

実装的には出力コネクタセットはComponentExitに、
出力コネクタはOutputConnectorに対応します。

現在はSDKではサブコネクタおよび、2つ以上の出力コネクタセットを持つコンポーネントはサポートされていません。
出力コネクタとしてはデフォルトコネクタのみが使用可能です。
サブコネクタがサポートされない現在ユーザが直接ComponentExitを操作することはありません。

2.3 コンポーネントプロパティ

コンポーネントプロパティとはコンポーネントの動作を決定するのに使用される各種設定情報です。
コンポーネントプロパティへの値の設定は設計時に行うこともできますし、実行時にMapperで挿し込むこともできます。

コンポーネント開発では各種Propertyクラスをコンポーネントに登録することで実装されます。
プロパティクラスは標準で提供されているクラスをそのまま使用することもできますし、自分で拡張することもできます。

2.4 ストリームファクトリ

それぞれの出力コネクタにはストリームファクトリが関連付けられています。 ただし入力ストリームをそのまま出力するようなコンポーネントはストリームファクトリを持ちません。

ストリームファクトリはコンポーネントで定義されたストリームプロパティとフィールド定義を保持します。
それらはコンポーネントによって作成されたストリームに対して1度だけ適用されます。
(ストリームに対して明示的にファクトリの適用メソッドを複数回呼び出したとしても2回目以降は無視されます。)

現状ではSDKでのコンポーネント開発は一組の入出力コネクタセットを持つSimpleComponentのサブクラスを作成することで行われます。

3 コンポーネントのメソッド

3.1 フロー実行の流れ

まずフローが実行されるとは、どういうことかを明確にします。
視覚的にはフローとはさまざまな操作を行うコンポーネントをリンク線で接続したものです。
フローの実行エンジンが行っていることはこれらのコンポーネントをリンク線の示す順序に従って実行していくことです。具体的にはコンポーネントの実行時にはexecute、またはexecuteLoopメソッドが実行されます。
コンポーネント開発者は作成するコンポーネントでこれらのメソッドをオーバーライドして、処理内容を記述することでコンポーネントを完成させます。

実行時に処理を実行するだけのコンポーネントであればexecuteメソッドを実装するだけで作成することができます。
それ以外にも、コンポーネントの初期化時に実行されるメソッド(init)や終了時に実行されるメソッド(term)など、フローの実行過程で呼び出されるメソッドは全部で5種あります。必要に応じてこれらのメソッドをオーバーライドすることにより複雑な処理を行うコンポーネントを作成することができます。

3.2 コンポーネントのインスタンス

コンポーネントのインスタンスはリクエストを受けてフローが初期化されたタイミングで生成されます。
この時、コンポーネントにはフローデザイナーで定義されたプロパティがすべてセットされています。(Mapperによって実行時に値を差し込まれることはあります。)
コンポーネントのインスタンスはフローデザイナー上でのアイコン単位で作成されます。
つまり同種のコンポーネントを複数フロー上に配置した場合はそれらは別のインスタンスとなりますが、 ループで同じコンポーネントが複数回実行される場合は同一インスタンスなので、そのexecuteメソッドは複数回実行されることになります。
サブフロー(とそれに含まれるコンポーネント)のインスタンスは最初にサブフローが実行される時に生成されます。
同一フローを参照するサブフローコンポーネントが複数ある場合はそれごとにインスタンスが生成され、ループの際には同一インスタンスが複数回実行される 点は通常のコンポーネントと同じです。

3.3 メソッドの種類

フローが実行される時に呼び出されるメソッドとしては以下の5つがあります。

それぞれのメソッドはExecuteContext(実行コンテキスト)を引数として呼び出されます。
executeメソッドはabstractメソッドとして定義されているので必ず実装する必要があります。
それ以外のメソッドについてはデフォルトでは何もしないメソッドが実装されているので、必要がなければオーバーライドする必要はありません。

ASTERIA3まではこれ以外にトランザクションを処理するためのcommit/rollbackメソッドがありましたが、トランザクションはTransactionインターフェースで処理されるモデルに変更されたため現在は使用されていません。(トランザクションについては後述します。)

3.3.1 execute

コンポーネントの実行処理を記述します。

コンポーネントが処理を行うタイミングで実行されます。
このメソッドの中では必ずストリームを出力コネクタにセットしなければなりません。

このメソッドの返り値はboolean型であり、返り値はコンポーネントがループの起点になるかどうかを示します。
このメソッドの返り値がfalseであった場合、フローの実行エンジンはこのコンポーネントをループのスタックに入れます。実行エンジンはその後、このコンポーネントに接続された次のコンポーネントからフローの実行を続け、終了コンポーネントまで実行されたらループスタックからコンポーネントを取り出し、そのコンポーネントのexecuteLoopメソッドを実行します。
falseを返す実装を行った場合は、executeLoopメソッドもあわせて実装する必要があります。
逆から言えばループの起点とならないコンポーネントでは、このメソッドは常にtrueを返します。

このメソッドがExceptionをthrowした場合はフローは設定されているExceptionフローにジャンプします。

executeメソッドの実装例
HttpGet Httpで指定のサーバーに対してリクエストを発行し、そのレスポンスを 出力ストリームとします。
返り値は常にtrueを返します。
Log 指定のログを出力します。
出力ストリームには入力ストリームがそのまま渡され、常にtrueを返します。
FileGet FilePathに指定したファイルを取得してストリームを作成し出力します。
FilePathにマッチするファイルが一つの場合かLoopProcessがFalseの場合は、 返り値はtrueになります。 FilePathにマッチするファイルが複数あり、かつLoopProcess=Trueの場合は、 最初のファイルをストリームとして出力し、falseを返します。
(2つ目以降のファイルはexecuteLoopメソッドで処理されます。)

3.3.2 executeLoop

コンポーネントのループでの実行処理を記述します。

ループの起点となるコンポーネントがループ処理を実行するタイミングで実行されます。
つまりループの起点となるコンポーネントでは最初の1回目の実行ではexecuteメソッドが実行され、2回目以降の実行ではexecuteLoopメソッド が実行されます。
(このコンポーネントよりも前にループの起点となるコンポーネントがあった場合はループ処理の中で毎回executeメソッドが実行されます。)
このメソッドの中では必ずストリームを出力コネクタにセットしなければなりません。

このメソッドの返り値はint型であり、戻り値にはComponentクラスで宣言されている以下のシンボルのいずれかを使用します。
それぞれのシンボルには次のような意味があります。

execute(またはexecuteLoop)メソッドの実行中には、次回のループ処理が必要かどうかわからないと言うケースが存在します。
そういう場合、とりあえずループがあるという返り値を返しておいて、次回のexecuteLoopメソッド中で処理の必要がなければLOOP_NOTHINGを返します。

このメソッドがExceptionをthrowした場合はフローは設定されているExceptionフローにジャンプします。

executeLoopメソッドの実装例
LoopStart LoopCountをデクリメントし、0であればLOOP_ENDを、1以上であれば LOOP_CONTINUEを返します。
FileGet
(LoopProcess=True)
executeメソッド実行時に作成したFilePathにマッチするファイルのリスト から、次に処理するファイルをストリームにして出力します。
リストを最後まで処理した場合はLOOP_ENDを返し、まだ続きがある場合は LOOP_CONTINUEを返します。
RDBGet
(LoopProces=True)
executeメソッド実行時に取得したResultSetに対して、nextメソッドを 発行します。
ResultSetに次のレコードが存在する場合は、それを1行のRecordストリーム として出力し、LOOP_CONTINUEを返します。
次のレコードがない場合はLOOP_NOTHINGを返します。

3.3.3 init

コンポーネントの初期化処理を記述します。

このコンポーネントが最初に実行される直前に実行されます。
ループの中で複数回execute/executeLoopが実行されるコンポーネントであってもinitが実行されるのは1度だけです。
ブランチにより1度も実行されないコンポーネントではinitも実行されません。

このメソッドがExceptionをthrowした場合はフローは設定されているExceptionフローにジャンプします。
この場合はinitメソッド自体は完了していないので、(Exceptionから復帰した後にループの中で)再度このコンポーネントが実行される場合には初回の実行であるとしてinitメソッドが実行されます。

initメソッドが実行されなかった場合(Exceptionをthrowした場合)はtermメソッドは実行されないことに注意してください。
つまり、その場合に必要な終了処理があればメソッド内で行っておく必要があります。

initメソッドの実装例
RDBGet 実行コンテキストからRDBのコネクションを取得します。

3.3.4 term

コンポーネントの終了処理を記述します。

メインフローの終了直前にinitメソッドが実行されたすべてのコンポーネントのtermメソッドが順番に実行されます。
フローがExceptionにより終了した場合でもtermメソッドはそれまでに実行したすべてのコンポーネントについて実行されます。
フローの中でサブフローやExceptionフローが実行された場合は、それらに含まれるコンポーネントのtermメソッドもメインフローの終了時にあわせて実行されます。
このメソッドではExceptionをthrowすることはできません。

termメソッドの実装例
FilePut
(AppendMode=Append)
ファイルをクローズします。

3.3.5 endFlow

コンポーネントのフロー終了処理を記述します。

ひとつのフローの実行が終了する度に、そのフロー内で実行されたコンポーネントendFlowメソッドが順番に実行されます。
フロー内にサブフローコンポーネントが存在する場合、そのサブフローに含まれるコンポーネントはendFlowメソッドの実行対象とはなりません。
endFlowメソッドはあくまで、実行されたフロー上に存在するコンポーネントだけを対象とします。

サブフロー内のコンポーネントのendFlowメソッドは、そのサブフローの実行終了時に実行されます。
ループでそのサブフローが複数回実行される場合は、その都度そのサブフロー内のコンポーネントのendFlowメソッドは実行されます。

フローがトランザクション化されている場合はendFlowメソッドはTransactionManagerのcommitまたはrollback後に実行されます。

このメソッドではExceptionをthrowすることはできません。

endFlowメソッドの実装例
RecordGet ファイルをクローズします。

4 ExecuteContext

ExecuteContextはフローの実行コンテキストを表すクラスです。
コンポーネントの各メソッドにはこのインスタンスが引数として渡されます。
このクラスを用いることで

を行うことができます。

4.1 ログ出力について

ASTERIA WARPではログ出力のライブラリとしてApache Jakarta Projectの Log4Jを使用しています。
ExecuteContextの各種ログ出力メソッドはLog4Jのラッパーとなっています。

ExecuteContextには以下のログ出力用のメソッドがあり、それぞれのメソッドに文字列を渡した場合のログの出力形式(Log4Jのパターン"%m"に対応)は以下のようになります。

メソッド 出力形式
debug メッセージコード: [セッションID] デバッグ(コンポーネント名) :メッセージ
debugInfo メッセージコード: [セッションID] デバッグ情報(コンポーネント名) :メッセージ
info メッセージコード: [セッションID] 情報(コンポーネント名) :メッセージ
warn メッセージコード: [セッションID] 警告(コンポーネント名) :メッセージ
error メッセージコード: [セッションID] エラー(コンポーネント名) :メッセージ
fatal メッセージコード: [セッションID] 致命的エラー(コンポーネント名) :メッセージ

debugInfo以外の各メソッドはLog4Jのpriorityに対応しています。
debugInfoはフローの実行がnormalモードで行われている場合は debugレベルで、それ以外のモードで実行されている場合は infoレベルとして出力されます。

4.2 フローのログのカテゴリ

フローの実行ログのカテゴリは

asteria.flow.プロジェクトオーナー名.プロジェクト名.フロー名

というカテゴリになります。(プロジェクトオーナー名はドメインを含むフルネームで、ドメインは「.」で区切られます。)
つまり、フローごとにログの出力先を変えたり、ログ出力レベルを変更したりすることができます。

4.3 変数の取得

各種変数は以下のメソッドで取得することができます。

変数 メソッド クラス
フロー変数 getFlowVariables VariableList
システム変数 getSystemVariables SystemVariables
外部変数 getExternalVariables ExternalVariablesVariableList

注意
VariableListやそれが保持しているValueクラスには値の設定メソッドがありますが、コンポーネントの中でフロー変数などに直接値を設定してはいけません。
Mapper以外のコンポーネントが値を設定してしまうと、フロー開発者がそれをトレースできなくなります。

4.4 プロジェクトオーナーと実行ユーザ

それぞれ getProjectOwnerメソッドとgetUserメソッドで取得することができます。
getProjectOwnerがnullを返すことはありませんが、getUserは実行ユーザが存在しない場合にnullを返します。

4.5 コネクション

ConnectionPropertyをキーにして、getConnectionメソッドで使用するコネクションを取得することができます。
この時該当するコネクションが存在しない場合はExceptionとなります。

取得するコネクションが既にフロー内で使用されている場合は、このメソッドで取得できるコネクションのインスタンスは先に使用されたコネクションと同じになります。
例えば、作成するコンポーネントがRDBコネクションを使用し、「TEST1」というRDBConnectionを取得しようとした場合に、それ以前 RDBGet/RDBPutなどのコンポーネント が「TEST1」というコネクションを使用していた場合は取得されるコネクションは先に使用されたものと同じインスタンスになります。

コネクションがそのリクエスト内で最初に取得された場合はFlowConnection#initメソッドが、2回目以降はFlowConnection#resetメソッドがExecuteContext#getConnectionメソッド内で実行されます。

コネクションの詳細な使用方法はConnectionの使用の章を参照してください。

4.6 トランザクション

コンポーネントがトランザクションをサポートする場合はexecuteまたはexecuteLoopメソッド内でTransactionインターフェースを実装したクラスを作成して、ExecuteContext#addTransactionメソッドに渡します。
追加されたTransactionはTransactionManagerによって適切なタイミングで実行されます。

トランザクションの詳細についてはトランザクションの章を参照してください。

5 SimpleComponentサブクラスの作成

SimpleComponentとは入力コネクタをひとつ、出力コネクタをひとつ持つもっとも基本的なコンポーネントの基底クラスです。
フローサービスに標準で登録されているコンポーネントのほとんどはこのクラスを継承して作成されており、ユーザの作成するコンポーネントもそうすることを推奨します。

まずは何も処理を行わず入力ストリームをそのまま出力するだけのコンポーネントを以下に示します。

import com.infoteria.asteria.flowengine2.execute.ExecuteContext;
import com.infoteria.asteria.flowengine2.flow.InputConnector;
import com.infoteria.asteria.flowlibrary2.FlowException;
import com.infoteria.asteria.flowlibrary2.component.SimpleComponent;
import com.infoteria.asteria.flowlibrary2.stream.StreamType;

public class DoNothingComponent extends SimpleComponent {
    
    public static final String COMPONENT_NAME = "DoNothing";
    public String getComponentName() { return COMPONENT_NAME;}           //1
    
    public DoNothingComponent() {                                        //2
        getInputConnector().setAcceptLinkCount(1);                       //3
        //getInputConnector().setAcceptContainer(true);                  //4
        //getInputConnector().setExpandContainer(false);                 //5
        getInputConnector().setAcceptType(StreamType.ALL);               //6
        getOutputConnector().setAcceptType(StreamType.ALL);              //7
    }
    
    public boolean execute(ExecuteContext context) throws FlowException {//8
        passStream();                                                    //9
        return true;                                                     //10
    }
    
}

数字のコメントがついている個所ではそれぞれ以下のことを行っています。

  1. コンポーネント名を返すメソッド
    このメソッドはabstractメソッドなので必ず実装する必要があります。
    ここで返されるコンポーネント名はサーバー上に登録されるすべてのコンポーネント内でユニークでなければなりません。
    コンポーネント名には「<会社名>.」というプレフィクスをつけてください。
    Infoteria以外の会社が「.」を含まない名前をコンポーネント名に使用することは禁止事項とします。

  2. コンポーネントには引数なしのコンストラクタが必要です。

  3. 入力コネクタが受け入れることのできるストリームの最大値を設定しています。
    多くのコンポーネントは受け入れ可能なストリーム数は1です。
    無制限にストリームを受け入れることができるコンポーネントの場合は

            getInputConnector().setAcceptLinkCount(InputConnector.LINK_UNBOUNDED);
    

    のように InputConnector#LINK_UNBOUNDEDというシンボルを使用して値を設定します。

  4. コネクタがストリームコンテナを受け入れるかどうかを設定しています。
    コンポーネントがストリームコンテナを受け入れない場合に、ストリームコンテナが流れてきた場合はExceptionが発生します。
    この設定のデフォルト値はtrueなのでソース中ではコメントアウトされています。

  5. コネクタがストリームコンテナを受け入れた時にそれを展開するかどうかを設定しています。
    複数ストリームを受け入れ可能なコネクタではgetStreamArrayメソッドを使用することで、入力ストリームを配列として取得することができます。
    この時にこの設定値がtrueであれば、コンテナ内のストリームがすべて展開されて配列の要素となります。
    falseの場合はストリームコンテナ自体が配列の1要素となります。
    この設定のデフォルト値はfalseなのでソース中ではコメントアウトされています。

  6. 入力コネクタが受け入れ可能なストリームフォーマットを指定しています。
    設定値にはStreamTypeクラスで宣言されているシンボルを使用します。
    例えばTextとHTMLのストリームだけを受け入れる場合は

            getInputConnector().setAcceptType(StreamType.TEXT|StreamType.HTML);
    

    のように設定します。
    すべてのストリームフォーマットを受け入れる場合はStreamType.ALLを使用します。
    受け入れ不可のストリームが流れてきた場合は、Exceptionが発生します。

  7. 出力コネクタが受け入れ可能なストリームフォーマットを指定しています。
    指定方法は入力コネクタの場合と同じです。

    3〜7はすべてのコンポーネントのコンストラクタ内で設定が必要です。
    コンストラクタを作成したら最初に設定するようにして下さい。

  8. executeメソッド
    このメソッドはabstractメソッドなので必ず実装する必要があります。

  9. passStreamメソッドを使用すると入力ストリームがそのまま出力ストリームとしてセットされます。
    コンポーネント内でストリームを作成して出力する場合は

            StreamDataObject os = ...;//出力ストリームを作成する。(後述)
            setOutputStream(os);
    

    のように出力ストリームを設定します。

  10. このコンポーネントはループの起点とならないので常にtrueを返しています。

5.1 サンプル1

import com.infoteria.asteria.flowengine2.execute.ExecuteContext;
import com.infoteria.asteria.flowengine2.flow.InputConnector;
import com.infoteria.asteria.flowlibrary2.FlowException;
import com.infoteria.asteria.flowlibrary2.component.SimpleComponent;
import com.infoteria.asteria.flowlibrary2.stream.StreamType;

/**
 * コンポーネントの各メソッドが実行されたときにログを出力するコンポーネント
 */
public class MethodLogComponent extends SimpleComponent {
    
    public static final String COMPONENT_NAME = "MethodLog";
    public String getComponentName() { return COMPONENT_NAME;}
    
    public MethodLogComponent() {
        getInputConnector().setAcceptLinkCount(1);
        //getInputConnector().setAcceptContainer(true);
        //getInputConnector().setExpandContainer(false);
        getInputConnector().setAcceptType(StreamType.ALL);
        getOutputConnector().setAcceptType(StreamType.ALL);
    }
    
    public void init(ExecuteContext context) throws FlowException {
        context.info("init");
    }
    
    public void term(ExecuteContext context) {
        context.info("term");
    }
    
    public boolean execute(ExecuteContext context) throws FlowException {
        context.info("execute");
        passStream();
        return false;
        //executeLoopメソッドも実行させたいのでここではfalseを返すことにします。
    }
    
    public int executeLoop(ExecuteContext context) throws FlowException {
        context.info("executeLoop");
        passStream();
        return LOOP_END;
        //1回ループ処理を実行したら、もうループさせたくないのでLOOP_ENDを返します。
        //ここで常にLOOP_CONTINUEを返すようなコードを書くとフローが無限ループして永遠に終わりません。
    }
    
    /** ループの起点となる可能性のあるコンポーネントではloopPossibilityをオーバーライドしてtrueを返します。 */
    public boolean loopPossibility() {
        return true;
    }
}

6 コンポーネントプロパティ

コンポーネントプロパティはPropertyインターフェースを実装するクラスをコンポーネントに登録することで利用できます。
通常はASTERIA WARPが標準で提供するプロパティのインスタンスを生成して登録します。
プロパティの登録はコンポーネントのコンストラクタで行います。

Mapperによって値の設定/参照が可能なのはValuePropertyまたはCategoryPropertyのサブクラスです。
標準で提供されるプロパティのほとんどはこのどちらかのサブクラスになっています。
標準で提供されるプロパティに何があるかについてはJavaDocを参照してください。

6.1 ValueProperty

ValuePropertyはインスペクタ上でプロパティ名と値が1対1で対応するプロパティの基底クラスです。
設定値の種類により、StringPropertyIntegerPropertyなどがあります。
また、プロパティ値として許容する値を制限するEnumPropertyやファイルパスを解決するためのPathResolverPropertyといったクラスも標準で提供されています。

6.2 CategoryProperty

CategoryPropertyはインスペクタ上でタブページとして表示されるプロパティに対応します。
これは名前と値のセットをフロー上で定義するためのプロパティです。
名前、データ型、デフォルト値を持つCategoryPropertyを使用する場合はSimpleCategoryPropertyを使用します。

それ以上の項目を持つタブページを作成する場合はExtendedCategoryPropertyを使用します。

6.3 ConnectionProperty

ConnectionPropertyはASMCで定義する各種コネクションを使用するためのプロパティです。
このプロパティをキーにしてExecuteContext#getConnectionメソッドを呼び出すことにより各種コネクションを取得できます。
ConnectionPropertyはMapperによって値を設定することはできません。
コネクションの詳細な使用方法は「Connectionの使用」を参照してください。

6.4 サンプル2

いくつかのPropertyクラスを使用するサンプルです。

プロパティ「A」「B」に任意の値を設定するとその和がプロパティ「C」に設定されます。

プロパティ「MyCategory」に名前と値のセットを定義すると、プロパティ「LogMethod」で指定されたメソッドでログに出力されます。

プロパティ「Connection」にコネクション名を設定すると、そのRDBConnectionのドライバ情報がプロパティ「DriverName」に設定されます。

import com.infoteria.asteria.flowengine2.execute.ExecuteContext;
import com.infoteria.asteria.flowengine2.flow.InputConnector;
import com.infoteria.asteria.flowlibrary2.FlowException;
import com.infoteria.asteria.flowlibrary2.component.SimpleComponent;
import com.infoteria.asteria.flowlibrary2.stream.StreamType;
import com.infoteria.asteria.flowlibrary2.property.*;
import com.infoteria.asteria.flowengine2.connection.RDBConnection;
import com.infoteria.asteria.connection.RDBConnectionEntry;
import java.util.Iterator;
import java.sql.Connection;
import java.sql.SQLException;

/**
 * Propertyのテストコンポーネント
 */
public class PropertyTestComponent extends SimpleComponent {
    
    public static final String COMPONENT_NAME = "PropertyTest";
    public String getComponentName() { return COMPONENT_NAME;}
    
    private IntegerProperty propA = new IntegerProperty("A", true, true);
    private IntegerProperty propB = new IntegerProperty("B", true, true);
    private IntegerProperty propC = new IntegerProperty("C", false, true);
    
    private SimpleCategoryProperty propCategory = new SimpleCategoryProperty("MyCategory");
    private EnumProperty propLogMethod = new EnumProperty("LogMethod", true, false);
    
    private ConnectionProperty propConnection = new ConnectionProperty(RDBConnectionEntry.TYPE, "Connection", false);
    private StringProperty propDriver = new StringProperty("DriverName", false, true);
    
    public PropertyTestComponent() {
        getInputConnector().setAcceptLinkCount(1);
        //getInputConnector().setAcceptContainer(true);
        //getInputConnector().setExpandContainer(false);
        getInputConnector().setAcceptType(StreamType.ALL);
        getOutputConnector().setAcceptType(StreamType.ALL);
        
        //LogMethodプロパティで許容する値を設定する
        propLogMethod.addEnumeration("fatal");
        propLogMethod.addEnumeration("error");
        propLogMethod.addEnumeration("warn");
        propLogMethod.addEnumeration("info");
        propLogMethod.addEnumeration("debugInfo");
        propLogMethod.addEnumeration("debug");
        
        //プロパティの登録
        registProperty(propA);
        registProperty(propB);
        registProperty(propC);
        registProperty(propCategory);
        registProperty(propLogMethod);
        registProperty(propConnection);
        registProperty(propDriver);
    }
    
    public boolean execute(ExecuteContext context) throws FlowException {
        //プロパティ「A」と「B」に値を設定してその和をプロパティ「C」に設定する。
        int a = propA.intValue();
        int b = propB.intValue();
        propC.setValue(a + b);
        
        //プロパティ「MyCategory」に設定したNameと値をプロパティ「LogMethod」で指定されたメソッドでログに出力する。
        Iterator it = propCategory.keySet().iterator();
        while (it.hasNext()) {
            String name = (String)it.next();
            String msg = name + " = " + propCategory.getValue(name); 
            switch (propLogMethod.getIndex()) {
                case 0: context.fatal(msg); break;
                case 1: context.error(msg); break;
                case 2: context.warn(msg); break;
                case 3: context.info(msg); break;
                case 4: context.debugInfo(msg); break;
                case 5: context.debug(msg); break;
            }
        }
        
        //プロパティ「Connection」に設定されたRDBConnectionのドライバ情報をプロパティ「DriverName」に設定する。
        if ( !propConnection.isNull()) {
            RDBConnection rcon = (RDBConnection)context.getConnection(propConnection);
            Connection con = rcon.getConnection();//ここで取得したjava.sql.Connectionは自由に使用することが出来る。
            
            try {
                propDriver.setValue(con.getMetaData().getDriverName());
            } catch (SQLException e) {
                //例外が発生した場合はそのメッセージをプロパティに設定する
                propDriver.setValue(e.toString());
            }
        }
        passStream();
        return true;
    }
    
}

7 入力ストリーム

入力ストリームはInputConnectorから取得します。
複数ストリームの受け入れが可能なコンポーネントではInputConnector#getStreamArrayメソッドで、StreamDataObjectの配列を、 単一ストリームのみを受け入れるコンポーネントではInputConnector#getStreamメソッドを使用してStreamDataObjectを取得します。

7.1 ストリームのデータアクセス

すべてのストリームクラスの基底クラスであるStreamDataObjectにはbyteValue, strValue, getRecordという3つのメソッドがあり、 それぞれストリームのバイナリ値、文字列値、レコードを取得することが出来ます。
またStreamDataXML#getDocumentメソッドのようにストリーム固有のデータアクセスメソッドがある場合もあります。

コンポーネントはこれらのメソッドを使用して入力ストリームのデータにアクセスしながら処理を行います。

メソッド 取得されるオブジェクト
byteValue byte[]
strValue java.lang.String
getRecord com.infoteria.asteria.flowlibrary2.stream.Record

入力ストリームを扱う際の注意

これらのメソッドで取得されるバイト列やRecord、StreamDataXMLのDocumentなどに対して直接変更を加えてはいけません。
これらのメソッドの多くはデータのコピーではなく、参照を返します。もしも自作コンポーネントが入力ストリームのデータを直接変更してしまった場合、

  FileSystem(Get) ------->  MyComponent
                  ------->  OtherComponent

のようなパラレルのあるフローでは、MyComponentの行った変更がOtherComponentの方で参照されることになります。
つまりOtherComponentで取得される入力ストリームがFileSystem(Get)が出力した状態と異なることになるので、このような操作は行ってはいけません。

7.2 Recordの使用

ストリームのRecordインターフェースはStreamDataObject#getRecordメソッドにより取得されます。
ストリームがレコードを持たない場合はこのメソッドはnullを返します。(フィールド定義を持たないXMLストリームなどがこれにあたります。)

Recordインターフェースはストリーム内の各レコードのイテレータになっていて、getRecordメソッドで取得された時点でそのポインタは先頭レコードを指しています。
通常はnextやnextRecordメソッドを使用して1レコードずつポインタをずらしながら各レコードにアクセスします。
典型的なレコードインターフェースの使用方法は以下のようになります。

    StreamDataObject is;
    Record record = is.getRecord();
    while (is != null) {
      Value v1 = record.getValue("Field1");
      Value v2 = record.getValue("Field2");
      //ここで何かv1, v2を用いた処理を行う
      record = record.nextRecord();
    }

Recordインターフェースにはnext以外にもabsoluteなどレコード番号を指定して各レコードにアクセスするメソッドもありますが、これらのメソッドはストリームの 種類によってはパフォーマンスが悪いことがあるので、可能な限りnextを使用してレコードを先頭からなめるようなアクセス方法で使用してください。

Record#getValueの注意

ストリームの種類によってはgetValueメソッドの返すValueオブジェクトのインスタンスがRecordをイテレートしても同じインスタンスを返すものがあります。
つまり次のようなコードは意図したとおりに動かない可能性があります。


    StreamDataObject is;
    Record record = is.getRecord();
    Value preValue = null;
    while (record != null) {
      Value v1 = record.getValue("Field1");
      Value v2 = record.getValue("Field2");
      if (preValue != null && preValue.equals(v1)) {
        //Field1の値が直前のレコードと同じだった場合に何かする
      }
      preValue = v1;
      record = record.nextRecord();
    }

この場合ストリームの種類によってはpreValueとv1が同じインスタンスになるので意図したとおりには動作しません。
(アクセスするフィールドが異なる場合は返されるValueインスタンスは同じにはなりません。上の例ではv1とv2が同じインスタンスになることはありません。)
上のようなことを行う場合、以下のように書きます。


    StreamDataObject is;
    Record record = is.getRecord();
    String preValue = null;
    while (is != null) {
      Value v1 = record.getValue("Field1");
      Value v2 = record.getValue("Field2");
      if (preValue != null && preValue.equals(v1.strValue())) {
        //Field1の値が直前のレコードと同じだった場合に何かする
      }
      preValue = v1.strValue();//Value#strValue()はnullを返すことはありません。
      record = record.nextRecord();
    }

7.3 StreamDataContainerの扱い

入力ストリームがコンテナ(StreamDataContainer)だった場合にそれがどのように扱われるかはInputConnector#setAcceptContainerとInputConnector#setExpandContainerの設定値によって決まります。

  true false
AcceptContainer コンテナを受け入れる 入力ストリームにコンテナがあった場合は
Exceptionとなる
ExpandContainer InputConnector#getInputStreamArrayを使用した時に
コンテナは展開されて取得される
コンテナはコンテナのまま取得される

コンポーネントがコンテナをどのように扱うかはそのコンポーネントの設計によりますが、可能な限りコンテナを受け入れるように設計してください。

8 出力ストリームの作成

8.1 ストリームプロパティとStreamFactory

ストリームプロパティはフローデザイナー上ではコンポーネントが作成する出力ストリームに対して設定されます。
コンポーネントが作成するストリームにはそこでの設定値が適用されている必要がありますが、ストリーム自身(StreamDataObjectのサブクラス)には ストリームプロパティに対するGetterメソッドはあっても、Setterメソッドはありません。
ストリームプロパティはStreamFactoryによって適用されます。 StreamFactoryはOutputConnectorのgetStreamFactoryメソッドによって取得されます。(ただし入力ストリームがそのまま出力されるようなコンポーネントではこのメソッドはnullを返します。)

ストリームを作成する方法には

の2つの方法があります。
ストリームクラスのコンストラクタを使用した場合は、ストリーム作成時点でストリームプロパティは設定されていません。(デフォルト値になっています。) この場合は作成後に明示的にStreamFactory#setPropertiesメソッドを使用してストリームプロパティを適用します。(SimpleComponent#setOutputStreamメソッドを使用した場合は、引数のストリームにストリームプロパティが適用されていなければ自動的にそのコンポーネントのStreamFactoryが適用されます。)
一部の特殊な作成方法が必要なストリーム以外は、そのFactoryクラスにコンストラクタと同じ引数を持つcreateメソッドがあるので、コンポーネントからストリームを作成する場合はできるだけFactoryのメソッドを使用することを推奨します。

ストリームに対するストリームプロパティの適用は1度しか行われません。
明示的にStreamFactory#setPropertiesメソッドを複数回使用した場合でも、2回目以降のメソッド呼び出しは何もしません。
つまりストリームプロパティの値は1度設定されるとその後値が変わることはありません。

このデザインはストリームの作成はコンポーネントの外側(例えばフローを外部から実行するクライアント)でも行えるが、それに対してプロパティとフィールド定義を設定するのはあくまでコンポーネントであることを意味しています。

8.2 Binaryストリーム

StreamDataBinaryはbyte[]を引数として作成します。

8.3 Textストリーム

StreamDataTextにはStringを引数として作成する方法ととbyte[]を引数として作成する方法があります。
(エンコーディングを指定しないbyte[]を引数とするコンストラクタでは、そのエンコーディングはストリームプロパティのエンコーディングとして解釈されます。byte[]のString化はstrValueメソッドが要求されるまで実行されません。つまりStreamFactoryの適用前にstrValueメソッドを呼び出してはいけません。)

8.4 HTMLストリーム

StreamDataHTMLの作成方法はStreamDataTextとすべて同じです。

8.5 CSVストリーム

StreamDataCSVの作成方法にはStreamDataTextと同じ作成方法の他に、java.util.Listを引数として作成するコンストラクタがあります。このメソッドを使用する場合、Listの内容はString[]のリストとなっていなければなりません。
またそこで使用するListはSerializableなクラスでなければなりません。(例えばArrayList#subListはSerializableでないListを返すので使用してはいけません。)

8.6 FixedLengthストリーム

FixedLengthに対応するストリームクラスはStreamDataFixedStringになります。
StreamDataFixedStringのコンストラクタにはbyte[]、String、Listをそれぞれ引数とする3つがあります。
byte[]を引数とするコンストラクタにはエンコーディングの指定はありません。FixedLengthのエンコーディング解釈は必ずStreamPropertyのエンコーディングで行われるからです。またStringを引数とした場合はエンコーディングでbyte[]に変換した後にパースされます。
Listを引数にするコンストラクタを使用する場合、Listの内容はValue[]のリストとなっていなければなりません。
またそこで使用するListはSerializableなクラスでなければなりません。

8.7 XMLストリーム

StreamDataXMLのコンストラクタにはbyte[]、String、org.w3c.dom.Documentをそれぞれ引数とする3つがあります。
byte[]を引数とするコンストラクタにはエンコーディングの指定はありません。XMLのエンコーディングはXML宣言内のエンコーディング指定により決定します。

Documentを引数としてStreamDataXMLを作成する場合、そのDocumentはNamespace awareなDocumentでなければなりません。
ASTERIA WARPではDOMのライブラリとしてXercesを使用していますが、Xercesでは Document#createElementを使用して 作成されたElementのgetLocalNameメソッドはnullを返します。
これはDOMの実装としては正しいですが、StreamDataXMLの引数となるDocumentではElementとAttributeは、getLocalNameメソッド でnullを返してはいけません。
コンポーネント内でDocumentを作成する場合は名前空間なしの場合でもcreateElementNSなどのNamespaceAPIを使用してください。

8.8 Recordストリーム

Recordストリームを作成する場合は通常はStreamDataRecordByListクラスを使用します。
このコンストラクタはjava.util.Listを引数にとります。Listの内容はValue[]のリストとなっていなければなりません。
またそこで使用するListはSerializableなクラスでなければなりません。

StreamFactoryRecordにはbyte[]を引数とするcreateメソッドが存在し、Recordストリームのバイナリ値をもとにストリームを作成することが出来ますが、このメソッドの使用は推奨されません。
Recordのバイナリ値はFileSystem(Put)コンポーネントでの出力など、主としてデバッグ用途のために用意されているものなので、そのフォーマットはASTERIA WARPのバージョンアップの際には変更になる可能性があります。フォーマット変更がある場合、その互換性については保証されません。

8.9 ParameterListストリーム

StreamDataParameterListのコンストラクタはFieldDefinitionを引数にとります。つまり、StreamFactoryと切り離して作成することは出来ません。

コンストラクタには初期値としてVariableListを渡すことの出来るコンストラクタも存在しますが、通常はStreamDataParameterListの作成後に、そのSetterメソッドを使用して値を設定します。

StreamFactoryParameterListのbyte[]を引数とするメソッドを使用するべきではないのはStreamDataRecordの場合と同じです。

8.10 MIMEストリーム

StreamDataMIMEの作成ではbyte[]とMIMEType(StreamDataMIME.Type)を引数として指定します。
引数となるbyte[]はMIMEとして正当なバイト列である必要があります。
MIMETypeには「HTTP」と「MIME」があり、そのMIMEをHTTPで使用する場合は「HTTP」をメールで使用する場合は「MIME」を指定します。

実際にはエンドユーザのレベルではMIMETypeについてほとんど気にする必要がありません。
StreamDataMIME#byteValueの返す値はMIMETypeの指定に関わらず、コンストラクタの引数となったbyte[]そのものです。
MIMETypeを使用するのはStreamDataMIMEが内部で使用するPartオブジェクトですが、Partオブジェクトの詳細についてはここでは記述しません。

8.11 ストリームコンテナ

ストリームコンテナとは型が同じ複数のストリームをひとつにまとめるためのものです。
例えばFileGetコンポーネントではFilePathに「*.csv」と指定することで複数のCSVファイルを一度に読み込むことができます。(LoopProcess=False)
例えばそれぞれ3行、5行、10行のデータを持つ3つのCSVをストリームコンテナにまとめて出力した場合、後続のMapperコンポーネントではそれがあたかも18行(3+5+10)のデータを持つ ひとつのCSVストリームであるかのように扱われます。
ストリームコンテナをどのように扱うかは各コンポーネントごとに異なります。中にはストリームコンテナを受け入れないコンポーネントもあるので扱いには注意が必要です。

ストリームコンテナを作成する場合はStreamDataContainerクラスを使用します。
StreamDataContainerにはaddStreamメソッドにより複数のストリームを追加することができますが、追加するストリームの型やフィールド定義が異なる場合はExceptionとなります。

9 ストリーム変数

ストリーム変数はフローの中では以下のような使われ方をします。

  1. コンポーネントが出力ストリームに対して固定の変数名で値を設定する。
    Ex.) FileSystem(Get)のFilePathなど
  2. コンポーネントが入力ストリームに対して固定の変数名で値があることを期待して、それがある場合はその値を使用する。
    Ex.) AttachedMailのFilePathなど
  3. 入力ストリームに対して任意のストリーム変数を設定する。(StreamVariablesコンポーネント)

ユーザ作成のコンポーネントがストリーム変数を使用するケースは1、または2です。
ストリーム変数を取得/設定するには、StreamDataObject#getStreamVariable/putStreamVariableメソッドを使用します。

10 Exception

コンポーネントの実行中に何らかの例外が発生した場合はFlowException(のサブクラス)をthrowします。
通常、コンポーネントがExcepitionを投げる場合はComponentExceptionまたはComponentExceptionByMessageCodeを作成してthrowします。

FlowExceptionのサブクラスには他にStreamで例外が発生した場合のStreamException、Connectionで例外が発生した場合のFlowConnectionExceptionなどがあります。
これらのExceptionはコンポーネント開発者が意図しない場合でも発生する可能性があります。例えばXMLストリームを出力するコンポーネントではユーザが「Validate=True」とプロパティを設定した場合にはValidationErrorのStreamExceptionが発生する可能性があります。

10.1 メッセージ

エラーメッセージなどのリソースはソースファイル中に直接書くことも出来ますが、xscファイル中にMessage要素として記述することでソースと分離することが出来ます。
xscファイル中に記述したメッセージはComponent#getMessageメソッドにより取得できます。
xscファイルの記述方法については「xscファイルの記述」の章を参照してください。

10.2 ExceptionParam

システム変数の「ExceptionParam1〜5」に値を設定する場合はFlowException#addParamメソッドを使用して値を設定します。
このメソッドで設定されたnameとvalueが、設定された順にそれぞれシステム変数の「ExceptionParamName1〜5」「ExceptionParam1〜5」に対応します。

FlowExceptionに対するaddParamはComponent#setExceptionParam(FlowException)メソッドをオーバーライドして、その中で行うようにして下さい。このメソッドはthrowされたExceptionがキャッチされ、Exceptionフローに渡される直前に実行されます。
Exceptionをthrowする前にaddParamしてからthrowすることも出来ますが、先にも述べた通りコンポーネント実行中に発生するExceptionはコンポーネント開発者自身がthrowしたものだけとは限らないので、その方法では、あるExceptionの場合にはExceptionParamが設定されているが、別のExceptionの場合にはパラメータが設定されていないという望ましくない情況が発生する可能性があります。

10.3 ストリーム

FlowException#setStreamというメソッドを使用することによりFlowExceptionにストリームを関連付けることができます。
ここで設定されたストリームはExceptionフローの入力ストリームとなります。
FlowExceptionにストリームが関連付けられていない場合は、Exceptionフローへの入力ストリームはコンポーネントの入力ストリームとなります。

このメソッドはExceptionをthrowする前に呼び出しても前述のComponent#setExceptionParamメソッド内で呼び出しても構いません。
Exceptionフローへの入力ストリームを何にするかはコンポーネントの設計次第です。

10.4 State

FlowExceptionにはStateというExceptionの種類を表すプロパティがあります。特に設定しない限りStateの値はFlowException#STATE_DEFAULT(=0)という値になります。
StreamExceptionなどのコンポーネント開発者の意図と関係なく発生する可能性のあるExceptionのStateはすべてFlowException#STATE_DEFAULTとなっています。

標準で実装されているコンポーネントにはすべて「Exception」という名前のExceptionPropertyが実装されていますが、このExceptionPropertyではあらゆるStateのExceptionをキャッチします。
それとは別に特定のStateのExceptionだけをキャッチするExceptionPropertyを新たに追加することも可能です。

例えばFileGetコンポーネントでは「State=1」のExceptionとしてFileNotFoundExceptionが定義されています。
コンポーネントの実装の中ではファイルが見つからなかった場合はState=1のExceptionを、それ以外のエラーではState=STATE_DEFAULTのExceptionがthrowされます。
State=1のExceptionはFileNotFoundExceptionプロパティにフローが設定されていればそちらでキャッチされ、それがない場合にExceptionプロパティが設定されていればそちらでキャッチされます。
つまりコンポーネントにStateの異なる複数のExceptionPropertyを作成することで、Exceptionの種類に応じて異なるExceptionフローを実行させるようなコンポーネントを作成することが出来ます。

コンポーネント内でthrowされたExceptionがExceptionPropertyによってどのようにハンドルされるかについてはユーザーズガイドのExceptionの項もあわせて参照してください。

11 ループ

Component#executeメソッドがfalseを返した場合、そのコンポーネントはループスタックに入れられループ処理待ち状態になります。
ループを実装するコンポーネントでは、さらに以下の2つのことを行います。

executeLoopメソッドの実装については「コンポーネントのメソッド」の章を参照してください。

loopPossibilityメソッドはコンパイル時にそのコンポーネントがループする可能性があるか否かをチェックするために使用されます。
例えば「LoopProcess」というBoolean型のプロパティの値によってループが発生するか否かの可能性が決まるコンポーネントでは以下のように実装します。

    private BooleanProperty loopProp = new BooleanProperty("LoopProcess", false, false, false);
    
    ...
    
    public boolean loopPossibility() {
        return loopProp.booleanValue();
    }
    

12 Connectionの使用

フローサービスではRDB、SMTP、HTTP、FTPなどの接続情報はConnectionとして定義され、ASMCにより設定されます。
ここではRDBConnectionの使用方法について説明します。それ以外のConnectionの使用方法についてはここでは記述しません。

RDBConnectionを使用するためにはConnectionPropertyを、RDBConnectionとして作成します。
そして、ExecuteContext#getConnectionメソッドによりConnectionを取得し、さらにそれをRDBConnectionにキャストしてから、RDBConnection#getConnectionメソッドによりjava.sql.Connectionを取得します。

java.sql.Connectionを取得した後はそれを使用して自由にJDBCのプログラミングを行って構いませんが、Connection#closeメソッドを実行してはいけません。何故なら取得したConnectionはフロー中の別コンポーネントによって使用される可能性があり、さらにはコネクションプールによってそれ以上に再利用される可能性があるからです。

また、Connection#commit/Connection#rollbackメソッドも実行するべきではありません。これらはフローのトランザクションフレームワークの中で実行されるべきメソッドだからです。
逆にRDBConnectionを使用するコンポーネントではConnectionをTransactionManagerに追加する処理をexecuteおよびexecuteLoopメソッド内に組み込むべきです。(次章で詳しく説明します。)

12.1 サンプル3


import com.infoteria.asteria.flowengine2.execute.ExecuteContext;
import com.infoteria.asteria.flowengine2.flow.InputConnector;
import com.infoteria.asteria.flowlibrary2.FlowException;
import com.infoteria.asteria.flowlibrary2.component.SimpleComponent;
import com.infoteria.asteria.flowlibrary2.component.ComponentException;
import com.infoteria.asteria.flowlibrary2.stream.*;
import com.infoteria.asteria.flowlibrary2.property.*;
import com.infoteria.asteria.flowengine2.connection.RDBConnection;
import com.infoteria.asteria.connection.RDBConnectionEntry;
import com.infoteria.asteria.value.Value;
import java.sql.*;
import java.util.ArrayList;

/**
 * 指定のTableのカラム情報をCSVまたはRecord型の出力ストリームとして出力するコンポーネント<br/>
 * コンポーネント内での作成するストリームのフィールド数は固定(カラム名、データ型の2列)<br/>
 * 配置したコンポーネントではそれにあわせてフィールド定義を行わなければならない。(フィールド名は任意)<br/>
 * Tableが見つからない場合のExceptionは通常のExceptionとは別に「TableNotFoundException」という
 * プロパティでキャッチできることとする。
 */
public class TableInfoComponent extends SimpleComponent {
    
    public static final String COMPONENT_NAME = "TableInfo";
    public String getComponentName() { return COMPONENT_NAME;}
    
    private static final int TABLE_NOT_FOUND = 1;
    
    /**
     * このコンポーネントが宣言されているmscファイルに
       <Message key="1">テーブルが見つかりません: %1</Message>
       <Message key="2">出力ストリームのフィールド定義が不正です</Message>
       というエントリを追加すること
     */
    private static final String MSG_TABLE_NOT_FOUND  = "1";
    private static final String MSG_INVALID_FIELDDEF = "2";
    
    private ConnectionProperty propConnection = new ConnectionProperty(RDBConnectionEntry.TYPE, "Connection");
    private StringProperty propTableName = new StringProperty("TableName", true, true);
    private ExceptionProperty propTableNotFound = new ExceptionProperty("TableNotFoundException", TABLE_NOT_FOUND);//第2引数には0以外の任意の数値を指定
    
    private RDBConnection _rcon = null;
    
    public TableInfoComponent() {
        //入力ストリームは使用しないので、すべてのストリームフォーマットを複数受け入れ可能とする
        //FileSystem(Get)などの入力ストリームを使用しない標準コンポーネントはすべてこのようにデザインされている
        getInputConnector().setAcceptLinkCount(InputConnector.LINK_UNBOUNDED);
        //getInputConnector().setAcceptContainer(true);
        //getInputConnector().setExpandContainer(false);
        getInputConnector().setAcceptType(StreamType.ALL);
        
        //出力ストリームはCSVまたはRecord
        getOutputConnector().setAcceptType(StreamType.CSV|StreamType.RECORDS);
        
        //プロパティの登録
        registProperty(propConnection);
        registProperty(propTableName);
        registProperty(propTableNotFound);
        //「Exception」という名前のExceptionPropertyはSimpleComponentクラスでregistされている。
    }
    
    /** コンポーネント初期化時にRDBConnectionを取得 */
    public void init(ExecuteContext context) throws FlowException {
        _rcon = (RDBConnection)context.getConnection(propConnection);
        //propConnectionで指定されたConnectionが取得できない場合はここでFlowConnectionException( extends FlowException)が発生する
        //必要ならそのExceptionは「Exception」プロパティで指定したフローによってキャッチする。
    }
    
    public void term(ExecuteContext context) {
        _rcon = null;
        //現状ではこの処理は必須ではないができるだけインスタンス変数は初期状態に戻すことが望ましい。
    }
    
    public boolean execute(ExecuteContext context) throws FlowException {
        Connection con = _rcon.getConnection();
        String table = propTableName.strValue();
        
        try {
            //DatabaseMetaDataを使用してテーブル情報を取得しても良いが、ここではResultSetMetaDataからTable情報
            //を取得することにした。
            Statement stmt = con.createStatement();
            ResultSet rs = null;
            try {
                String sql = "SELECT * FROM " + table + " WHERE 1 = 2";
                rs = stmt.executeQuery(sql);
            } catch (SQLException e) {
                //ここでキャッチされた例外はテーブルが見つからなかった場合と思われる。
                //ここでthrowするExceptionだけStateを「1」に。
                throw new ComponentException(getMessage(MSG_TABLE_NOT_FOUND, table), TABLE_NOT_FOUND);
            }
            ResultSetMetaData meta = rs.getMetaData();
            StreamFactory factory = getOutputConnector().getStreamFactory();
            //定義されたフィールド数が2列でなければExceptionとする。
            if (factory.getFieldDefinition().getFieldCount() != 2)
                throw new ComponentException(getMessage(MSG_INVALID_FIELDDEF));
            
            StreamDataObject os = null;
            switch (factory.getType()) {
                case StreamType.CSV:     os = createCSV(meta); break;
                case StreamType.RECORDS: os = createRecord(meta); break;
                default:
                    //このコードが実行されることはない
                    throw new IllegalStateException();
            }
            setOutputStream(os);
            //使用したコネクションをTransactionManagerに追加する
            context.addTransaction(context.getConnectionTransaction(propConnection));
        } catch (SQLException e) {
            throw new ComponentException(e);
        }
        return true;
    }
    
    /** CSVストリームの作成 */
    private StreamDataObject createCSV(ResultSetMetaData meta) throws FlowException, SQLException {
        int len = meta.getColumnCount();
        //CSV文字列の作成
        StringBuffer buf = new StringBuffer(512);
        for (int i=1; i<=len; i++) {
            buf.append(meta.getColumnName(i)).append(',');
            buf.append(meta.getColumnTypeName(i)).append('\n');
        }
        
        StreamFactoryCSV factory = (StreamFactoryCSV)getOutputConnector().getStreamFactory();
        return factory.create(buf.toString());
    }
    
    /** Recordストリームの作成 */
    private StreamDataObject createRecord(ResultSetMetaData meta) throws FlowException, SQLException {
        int len = meta.getColumnCount();
        //Record用のListの作成
        ArrayList list = new ArrayList(len);
        for (int i=1; i<=len; i++) {
            Value[] values = new Value[2];
            values[0] = new Value(meta.getColumnName(i));
            values[1] = new Value(meta.getColumnTypeName(i));
            list.add(values);
        }
        
        StreamFactoryRecord factory = (StreamFactoryRecord)getOutputConnector().getStreamFactory();
        return factory.create(list);
    }
    
    /** ExceptionParamとして「TableName」を設定 */
    public void setExceptionParam(FlowException e) {
        e.addParam(propTableName.getName(), propTableName.getValue());
    }
    
}

13 トランザクション

トランザクション処理はTransactionインターフェースを介して行われます。 Transactionインターフェースは以下に示すようにcommitメソッドとrollbackメソッドを持つインターフェースです。

package com.infoteria.asteria.flowengine2.execute;

import com.infoteria.asteria.flowlibrary2.FlowException;

public interface Transaction {
	
	public void commit(ExecuteContext context) throws FlowException;
	public void rollback(ExecuteContext context) throws FlowException;
}

ExecuteContext#addTransactionメソッドを使用することによりTransactionManagerにTransaction処理を積み上げていくことができます。

BeginTransaction=Falseの場合はコンポーネントをひとつ実行する度にTransactionはcommitされ、
BeginTransaction=Trueの場合はトランザクション化されたフローの終了時にそこまでに積まれたTransactionがまとめてcommitされます。

親フローのBeginTransaction=Falseの場合はサブフローだけをトランザクション化してその単位でcommitまたはrollbackすることもできます。
また、BeginTransaction=Falseの場合はトランザクションがロールバックされてからExceptionフローがスタートします。

13.1 トランザクションを処理するコンポーネントの作成方法

executeまたはexecuteLoopメソッド内でExecuteContext#addTransactionメソッドを実行することで、 必要なTransactionをTransactionManagerに追加することができます。

このメソッドはexecute処理内で複数回実行することができますが、同じTransactionインスタンスを複数回addしても同一インスタンスは1度だけしかTransactionManagerに追加されません。
この仕様はフローでループが発生する場合に大きな意味を持ちます。
トランザクションの仕組みを理解するためにトランザクション処理の内容としてログ出力を行う次のような2つのコンポーネントを考えてみます。

/**
 * Transactionとして追加されるのはComponent自身。
 * つまりループの中で複数回コンポーネントが実行される場合は毎回同一インスタンスが
 * ExecuteContext#addTransactionに渡される
 */
class ComponentA extends SimpleComponent implements Transaction {
	
	public String getComponentName() { return "ComponentA";}
	
	public boolean execute(ExecuteContext context) throws FlowException {
		context.addTransaction(this);
		passStream();
		return;
	}
	
	public void commit(ExecuteContext context) throws FlowException {
		//ExecuteContext#infoはログに情報を出力するメソッド
		context.info("ComponentA commit");
	}
	
	public void commit(ExecuteContext context) throws FlowException {
		context.info("ComponentA rollback");
	}
}

/**
 * 追加するTransactionを毎回 new しているので常に異なるインスタンスとなる
 */
class ComponentB extends SimpleComponent {
	
	public String getComponentName() { return "ComponentB";}
	
	public boolean execute(ExecuteContext context) throws FlowException {
		context.addTransaction(new MyTransaction());
		passStream();
		return;
	}
	
	private static class MyTransaction implements Transaction {
		public void commit(ExecuteContext context) throws FlowException {
			context.info("ComponentB commit");
		}
		
		public void commit(ExecuteContext context) throws FlowException {
			context.info("ComponentB rollback");
		}
	}
}

この2つのコンポーネントを用いて次のようなフローを書いたとします。

  Start(BeginTransaction=True)
    ↓
  LoopStart(LoopCount=3)
    ↓
  ComponentA
    ↓
  ComponentB
    ↓
  End

するとログへの出力結果は次のようになります。

  情報(End1) : ComponentA commit
  情報(End1) : ComponentB commit
  情報(End1) : ComponentB commit
  情報(End1) : ComponentB commit

トランザクションのコミットはフローの終了時(Endコンポーネント実行後)に一度だけ行われています。
ComponentA、ComponentBともにループの中で3回実行されていますが、ComponentAが積んだTransactionはひとつだけであり、 ComponentBは3つのTransactionを積み上げていることがわかります。

ちなみにこのフローをStart(BeginTransaction=False)として実行すると、結果は以下のようになります。

  情報(ComponentA1) : ComponentA commit
  情報(ComponentB1) : ComponentB commit
  情報(ComponentA1) : ComponentA commit
  情報(ComponentB1) : ComponentB commit
  情報(ComponentA1) : ComponentA commit
  情報(ComponentB1) : ComponentB commit

BeginTransaction=Falseの場合はコンポーネント実行直後にTransactionがcommitされるので、 ComponentAが2回目に実行される際には以前に追加したTransactionは残っておらず、 再度ComponentA自身がTransactionとして追加されているのです。

13.1.1 コネクションを使用しているコンポーネントでのTransaction処理

RDBConnectionを使用するコンポーネントは、「RDBGet」「RDBPut」「SQLCall」など複数存在します。
それらのコンポーネントはひとつのフロー内に複数配置されることがありえるわけですが、 BeginTransaction=Trueのフローではそれらのコンポーネント群のトランザクション処理はRDBConnectionに対して1度だけ行われれば良いことになります。

コンポーネント側の処理としては使用しているコネクションに対応する単一のTransactionインスタンスを取得し、それをTransactionManagerに追加すればOKです。
別のRDBコンポーネントが同じコネクションを使用していてもTransactionインスタンスが重複するためTransactionManagerによって破棄されます。

	private ConnectionProperty _connectionProp = new ConnectionProperty(RDBConnectionEntry.TYPE, "Connection");
	
	/**
	 * executeメソッドの先頭で使用しているConnectionに対応するTransactionを追加
	 */
	public boolean execute(ExecuteContext context) throws FlowException {
		context.addTransaction(context.getConnectionTransaction(_connectionProp));
		...
	}
	
	/*
	//executeLoopメソッドが実装されている場合は同様にTransactionの追加をメソッドの先頭で行う
	public int executeLoop(ExecuteContext context) throws FlowException {
		context.addTransaction(context.getConnectionTransaction(_connectionProp));
		...
	}
	*/

14 サンプル

サンプルとして標準で提供されているコンポーネントのいくつかのソースを公開します。

15 その他のトピック

15.1 拡張プロパティの作成

標準で提供されているPropertyを使用する以外に、自分でプロパティを拡張して使用することができます。

多いのはCategoryPropertyのサブクラスを作成するケースです。
CategoryPropertyではフローデザイナー上のタブページに表示する列を何列でも自由に定義することができます。 つまり表形式のプロパティ(例. SQLCallコンポーネントのSQLParameter)を定義することが出来ます。 (ただしMapperで値をマッピングできる項目はそのうちのひとつだけです。)
自作のCategoryPropertyを作成した場合その情報はプロジェクトファイル中で以下のようになります。


    <fe:Property name="Param">
        <fe:Row Name="p1" Type="String" Default="aaa"/>
        <fe:Row Name="p2" Type="String" Default="aaa"/>
    </fe:Property>

プロパティ上の1行がRow要素に対応し、列情報は属性として設定されます。
CategoryPropertyのcompileメソッドにはRow要素が引数として渡されるので、それをオーバーライドして、 コンパイル時のプロパティ設定を行います。

それ以外に拡張プロパティを作成するケースとしては以下のようなケースが考えられます。

15.2 拡張コンパイラの作成

コンポーネントのコンパイル時の流れは以下のようになります。

  1. ComponentManagerよりコンポーネントのインスタンスを取得
  2. Component#getCompilerメソッドによりComponentCompilerを取得
  3. ComponentCompilerにComponent要素を渡してコンパイル

自分でComponentCompilerのサブクラスを作成し、Component#getCompilerメソッドでそれを返すようにする ことでCompilerを拡張することが出来ます。
拡張コンパイラを作成することで以下のようなことが行えます。

15.3 cloneメソッドのオーバーライド

フローの実行がリクエストされるとまずプロジェクトプールにコンパイル済みのプロジェクトがあるかどうかが チェックされます。プールにコンパイル済みプロジェクトがない場合はプロジェクトファイルがコンパイルされ、 コンパイル済みのプロジェクトがプールに入れられます。
そしてこのコンパイル済みのプロジェクトから実行するフローをcloneすることで、リクエストで使用するComponent が取得されます。

フローのclone時には、そのすべてのコンポーネントのcloneメソッドが呼び出されます。
Componentクラスのcloneメソッドの実装の一部を以下に示します。


    public Object clone() {
        Component ret = null;
        try {
            ret = (Component)getClass().newInstance();
        } catch (IllegalAccessException e) {
        } catch (InstantiationException e) {
        }
        ret.setName(getName());
        int cnt = ret.getPropertyCount();
        for (int i=0; i < cnt; i++) {
            Property prop = ret.getProperty(i);
            prop.assign(getProperty(i));
        }
        ...
        return ret;
    }

通常のcloneとは異なり、ComponentのcloneはClass#newInstanceメソッドを用いて実装されています。
つまりclone時にコンストラクタが実行されます。
その後登録されているプロパティの値をコピーすることで複製を作成します。
逆から言えば登録されているプロパティ以外のインスタンス変数はコピーされません。

拡張コンパイラを使用してコンパイル時にインスタンス変数に何らかの値を設定した場合はComponent#clone メソッドをオーバーライドして、そのコピーを自分で行う必要があります。
(cloneメソッド内では、super.clone()を必ず呼び出す必要があります。)