フローサービスでは独自のコンポーネントをJavaで開発し追加することが可能になっています。
コンポーネント開発を行う場合、以下の作業が必要になります。
Componentクラスが実際にサーバー上で動作する機能そのものになります。
一方デザイナー側ではこのクラスは使用されず、定義ファイルの情報だけからコンポーネントのアイコンやインスペクタに表示するプロパティの情報を取得します。
それらをまとめたものがjarファイルとなるわけですが、実際にはjarファイルの作成はJavaInterpreterコンポーネント付属のSDKウィザードで作成されるANTのbuild.xmlによってほぼ自動化されています。
これ以外にもデザイナーのプラグインを作成することでUIの拡張を行うこともできますが、ここではサーバー側のComponentクラスのソースコードの書き方を中心に説明します。
定義ファイルの詳細に関しては定義ファイルリファレンスを、
jarファイルの作成とそのインストールに関してはツールガイドを参照してください。
コンポーネントの開発はJDK8.0以降の環境で行ってください。
また、「DESIGNER_HOME/lib」にある以下のjarファイルにクラスパスを通してください。
※ascore-1610.0200.jarと asdesigner-1610.0200.jarの「-1610.0200」の部分はインストールしたバージョンにより異なります。
作成するコンポーネントの内容によってはさらに別のjarファイルがコンパイル/実行に必要になる場合があります。
その場合、flow-ctrlのshowlibコマンドを使用することで必要とするクラスがどのjarに含まれているかを確認することができます。
//showlibコマンドによって指定のクラスがどのjarに含まれているかが表示されます。 >showlib com.infoteria.asteria.flowlibrary2.stream.StreamDataXML jar:file:C:\Program Files\asteria5\server\lib\ascore-1610.0200.jar!/com/infoteria/asteria/flowlibrary2/stream/StreamDataXML.class
必要に応じてここで表示されたjarファイルにもクラスパスを通してください。
実際にはウィザードで生成されるANT用のbuild.xmlでは「DESIGNER_HOME/lib/**/*.jar」にクラスパスが通っているので、通常はbuild.xmlの設定を変更する必要はありません。
フローサービスの中での役割から言えばコンポーネントとはフローにおける処理の単位です。
コンポーネントでは例えば次のような処理が行われます。
およそコンピュータ上で実現可能な処理はすべてフローサービスのコンポーネントとして実装することが可能です。
実装面から言えばコンポーネントとは「com.infoteria.asteria.flowengine2.flow.Component」のサブクラスのことです。
このクラスの各種メソッドをオーバーライドすることでコンポーネントは作成されます。
実装の観点からコンポーネントを考えた場合、以下のように分類することができます。
SDKでは開始コンポーネント、終了コンポーネント以外のコンポーネントを作成することができます。
コンポーネント開発ではComponentクラスを継承し機能を拡張していくことになります。
(実際には分岐コンポーネント以外ではComponentを継承した抽象クラスであるSimpleComponentクラスを継承して開発します。)
コンポーネント内部では様々なクラスが使用されていますが、ここでは先の分類に対応する形でコンポーネントを構成する主要クラスについて説明します。
入力コネクタセットは入力ストリームの受け口です。
デザイナーのアイコン上ではアイコン上部にあるコネクタがこれに相当します。
開始コンポーネント以外のコンポーネントは必ずひとつの入力コネクタセットを持ちます。
そして入力コネクタセットは必ずひとつのデフォルトコネクタと0個以上のサブコネクタを持ちます。
上記の分類に沿って考えればサブコネクタをひとつも持たないコンポーネントがシンプルコンポーネントであり、
サブコネクタを持つコンポーネントが複数入力のあるコンポーネントです。
(サブコネクタを複数持つコンポーネントとしてはメールコンポーネントやExcelOutputコンポーネントがあります。)
通常、入力コネクタにはひとつのリンクのみ接続可能ですが、Velocityコンポーネントのように複数のリンクを接続できる(= 複数の入力ストリームを扱える)ようにすることもできます。
コンポーネントは入力コネクタにストリームがセットされてはじめて実行可能な状態になるので、コンポーネント実行時には少なくとも一つ以上の
ストリームがデフォルトコネクタにセットされています。
ただし、サブコネクタには入力ストリームがセットされていないこともあります。(= サブコネクタにはリンクが接続されていなくとも構いません。)
出力コネクタはストリームの出口です。
終了コンポーネント以外のコンポーネントはひとつ以上の出力コネクタセットを持ちます。
出力コネクタセットを複数持つコンポーネントは上記分類の分岐コンポーネントになります。
つまりそれぞれの出力コネクタセットが分岐に対応し、実行時にはいずれかひとつの出力コネクタセットからストリームが出力されます。
出力コネクタセットは入力コネクタセットと同様にひとつのデフォルトコネクタと0個以上のサブコネクタを持ちます。
サブコネクタを持つコンポーネントが複数出力のあるコンポーネントです。
(サブコネクタは複数持つコンポーネントとしてはRecordFilterコンポーネントやExcelInputコンポーネントがあります。)
コンポーネントはその実行時に必ず使用する出力コネクタセットのデフォルトコネクタにストリームをセットしなければなりません。
使用する出力コネクタがサブコネクタを持つ場合はそちらへのストリームのセットは必ずしも必須ではありません。
(サブコネクタにストリームがセットされなかった場合はそこから続く後続のフローは実行されません。)
それぞれの出力コネクタにはストリームファクトリを持ちます。
ただし入力ストリームをそのまま出力するだけの出力コネクタはストリームファクトリを持ちません。
ストリームファクトリはデザイナー上で定義されたストリームプロパティとフィールド定義を保持し、ストリームを生成するためのFactoryクラスとなります。
ストリームファクトリは出力コネクタ毎に定義することができます。
例えばExcelInputコンポーネントではExcelBuilder上で定義したレコード定義がそれぞれの出力コネクタのストリームファクトリに設定されます。
ここまでに説明された概念をクラス図で示します。
(このクラス図はコンポーネント構成要素の説明のみを目的としているのでそれ以外の多くのメソッドやフィールドが省略されています。)
Componentクラスはフローサービス内で実行されるすべてのコンポーネントの抽象基底クラスです。
フローの実行エンジンは作成されたフローからComponentの実行順序を解析して、それを順番に実行していきます。
Componentではいくつかのabstractメソッドと何もしない空メソッドが定義されています。
abstractメソッドとして定義されたメソッドは主に先の章で説明したコンポーネントの構成要素を実装する部分と、
実際にコンポーネントが行う処理を実装するexecuteメソッドなどです。
空メソッドとして定義されたメソッドはループや初期化などの必要な場合にのみ実装すれば良いメソッドです。
それらのメソッドをオーバーライドして適切に実装していくことでカスタムコンポーネントは完成します。
入出力コネクタがひとつだけのシンプルなコンポーネントを作成する場合は、Componentを1段階凡化したSimpleComponentクラスを
継承して作成することでさらに簡単にコンポーネントを作成することができます。
(実際にフローサービスの標準コンポーネントは分岐コンポーネント以外すべてSimpleComponentを継承して作成されています。)
/** * コンポーネントの実行コードを記述します */ public abstract boolean execute(ExecuteContext context) throws FlowException;
コンポーネント開発者にとって最も重要な意味を持つメソッドはexecuteメソッドです。
実際の所ループを処理しない単純なコンポーネントを作成する場合は実装するメソッドはほとんどこのメソッドのみとなります。
このメソッドでは実際にそのコンポーネントが行う「処理」を記述します。
標準コンポーネントとして例えば次のような処理が実装されています。
ほんの数行のコードで実現可能な処理から、一般のSIerには馴染みの無いような複雑な処理まで、Javaで可能なことであればどんな処理であっても コンポーネント化することができます。
executeメソッドの返り値はそのコンポーネントがループの起点となるかどうかを示すbooleanです。
そのコンポーネントがループの起点となる場合はfalseを返します。
そうすることでフローの実行エンジンはそのコンポーネントをループ用のスタックに積みます。(つまりループします。)
例えばCSVファイルを指定行数ずつ読み込むコンポーネントであるRecordGetコンポーネントでは指定の行数を読み込んだあとに
データがまだ残っている場合は残りのデータをループで処理するのでfalseを返します。
ループを処理しないコンポーネントでは常にtrueを返せばOKです。
/** ループ処理の実行コードを記述します */ public int executeLoop(ExecuteContext context) throws FlowException { return LOOP_END; }
ループの起点となるコンポーネントで実際にループ時の処理を記述するメソッドがexecuteLoopメソッドです。
上記はComponentクラスで実装されているexecuteLoopメソッドのデフォルトの実装です。
何もせずにLOOP_ENDを返しています。
ループの起点となることがないコンポーネントではこのメソッドをオーバーライドする必要はありません。
ループの起点となるコンポーネントではこのメソッドをオーバーライドしてループ時の処理を記述します。
メソッドの返り値はint型でありComponentクラスで定義されている以下のシンボルのうちのいずれかでなければなりません。
LOOP_END | Loop処理を実行し、Loopは終了した |
---|---|
LOOP_CONTINUE | Loop処理を実行し、Loopは継続する |
LOOP_NOTHING | Loop処理は実行されなかった |
メソッドの実行によりループで処理すべき内容がすべて完了した場合にはLOOP_ENDを返し、
さらにループによってこのコンポーネントに戻ってきて欲しい(=まだ処理すべきデータがある)場合には
LOOP_CONTINUEを返します。
LOOP_NOTHINGはほとんど使用することはありませんが、前回の実行でLOOP_CONTINUEを返したにも関わらず、
実際にはループ処理が必要がなかった場合にこの値を返します。
LOOP_NOTHINGが返された場合、コンポーネントはループの起点とはなりません。つまり、前回の実行で
LOOP_ENDが返されたのと同じことになります。
例えばJDBCのResultSetをnextメソッドでイテレートしながら処理を行うような場合、nextメソッド
を実行するまで次のデータがあるかどうかわからないので、LOOP_NOTHINGを返す必要がある場合があります。
注意が必要なのはexecuteLoopメソッドはコンポーネントがループの起点となっている場合に処理を行うメソッドであり、 ループの中でコンポーネントが実行される場合に呼び出されるメソッドはexecuteメソッドであるという点です。
左記のフローでは回数を指定してループするコンポーネント(LoopStartコンポーネント)で3回のループを行っています。
このフローではLoopStartコンポーネントは最初の実行ではexecuteメソッドが実行され、2回目、3回目の実行ではexecuteLoopメソッドが実行されます。
ループ内で実行されるFileGetコンポーネントは3回ともexecuteメソッドが実行されます。
LoopStartコンポーネントの前にさらに別のLoopStartコンポーネントを置く(つまりループをネストさせる)こともできますが、 その場合コンポーネント自身がループの起点となっている場合以外に実行されるメソッドはexecuteメソッドです。
コンポーネントのインスタンスはリクエストを受けてフローが初期化されたタイミングで生成されます。
この時、コンポーネントにはデザイナーで定義されたプロパティがすべてセットされています。(Mapperによって実行時に値を差し込まれることはあります。)
コンポーネントのインスタンスはデザイナー上でのアイコン単位で作成されます。
つまり同種のコンポーネントを複数フロー上に配置した場合はそれらは別のインスタンスとなりますが、
ループで同じコンポーネントが複数回実行される場合は同一インスタンスなので、そのexecuteメソッドは複数回実行されることになります。
サブフロー(とそれに含まれるコンポーネント)のインスタンスは最初にサブフローが実行される時に生成されます。
同一フローを参照するサブフローコンポーネントが複数ある場合はそれごとにインスタンスが生成され、ループの際には同一インスタンスが複数回実行される
点は通常のコンポーネントと同じです。
これがコンポーネント開発者にとってどういう意味を持つかと言うと、コンポーネントの状態をメンバー変数に保存しておいても構わないと言うことです。
executeメソッドで保存した状態をexecuteLoopメソッドで参照して処理を行うことには何も問題はありません。
例えばLoopStartコンポーネントはexecuteメソッド内でプロパティ値から取得したループ回数をメンバー変数に設定し、 executeLoopメソッドでそれをデクリメントしています。
それ以外にもComponentクラスにはフローの実行の過程でコールバックされるメソッドがいくつかあります。
標準ではそれらは何もしない空メソッドとして実装されていますが、必要に応じてオーバーライドすることによって
コンポーネントの動作をより細かく制御することができます。
/** 初期化処理を記述します */ public void init(ExecuteContext context) throws FlowException { }
initメソッドはコンポーネントで初期化処理を行いたい場合に実装します。
initメソッドはコンポーネントが最初に実行される直前に実行されます。
ループの中で複数回execute/executeLoopが実行されるコンポーネントであってもinitが実行されるのは1度だけです。
ブランチにより1度も実行されないコンポーネントではinitも実行されません。
initメソッドではexecuteやexecuteLoopメソッドで繰り返し使用するオブジェクトをメンバー変数に設定しても構いません。
例えばRDBGetコンポーネントではDB接続に使用するコネクションをinitメソッド内で取得します。
RDBへのConnectionはフローの実行途中で変わることがないのでinitメソッド内で初期化するのが最も効率が良いためです。
逆に言えばマッピングによって実行途中に値が変わる可能性のあるプロパティに対する処理はinitメソッド内で行うべきではありません。
/** 終了処理を記述します */ public void term(ExecuteContext context) { }
termメソッドはコンポーネントで終末処理を行いたい場合に実装します。
メインフローの終了直前にinitメソッドが実行されたすべてのコンポーネントのtermメソッドが順番に実行されます。
フローがExceptionにより終了した場合でもtermメソッドはそれまでに実行したすべてのコンポーネントについて実行されます。
フローの中でサブフローやExceptionフローが実行された場合は、それらに含まれるコンポーネントのtermメソッドもメインフローの終了時にあわせて実行されます。
initメソッドの実行が完了しなかった場合(FlowExceptionをthrowした場合)はtermメソッドやendFlowメソッドは実行されないことに注意してください。
initとtermは対で実装されることが多い(initで初期化したオブジェクトをtermでクリーンアップするなど)のですが、initで中途半端にオブジェクトが初期化された
ままExceptionとなるとtermは実行されませんので、その場合はinitメソッド内で必要なクリーンアップを行ってからExceptionをthrowする必要があります。
またNextフローがある場合はtermメソッドはNextフローの実行よりも前に実行されます。(Nextフローはフローの終了時に続けて次のフローを実行する機能なので、
それが実行される時には先に実行したフローは完全に終了しています。)
終末処理をリクエスト終了時に実行したい場合はtermメソッドを実装するのではなく、IndependentMapの章で説明されているReleasableインターフェースを使用してください。
/** 個別のフロー実行終了時の処理を記述します */ public void endFlow(ExecuteContext context) { }
endFlowメソッドはコンポーネントでフロー終了時の処理を行いたい場合に実装します。
ひとつのフローの実行が終了する度に、そのフロー内で実行されたコンポーネントのendFlowメソッドが順番に実行されます。
termメソッドとは以下のような違いがあります。
フローがトランザクション化されている場合はendFlowメソッドはTransactionManagerのcommitまたはrollback後に実行されます。
/** * コンポーネントのキャンセル処理を記述します。 * キャンセル処理が正しく行えた場合はtrueを返します。 * (デフォルトの実装では常にfalseを返します。) */ public boolean cancel() { return false; }
cancelメソッドではコンポーネントの強制終了処理を実装します。
cancelメソッドはフローの強制終了時にのみ実行されるものなので通常の処理の中では実行されません。
また強制終了時にも必ず実行されるとは限りません。
強制終了は原則的に一つのコンポーネントの実行が終わり次のコンポーネントの実行に遷移するタイミングで実行されます。
強制終了がリクエストされるとフローの実行エンジンは指定のタイムアウト秒(デフォルトは5秒)現在実行中のコンポーネントが
終了するのを待ちます。
それでも実行が終了しない場合はcancelメソッドを実行し、trueが返ってきた場合は再度タイムアウトまでコンポーネントの終了を
待ち、それでも実行が終了しなかった場合は「強制終了が指定時間内に終了しなかった」というエラーになります。
つまりcancelメソッドではそのコンポーネントのexecute(またはexecuteLoop)の処理を中断しそこから抜けるような処理を記述します。
このメソッドは他のメソッドと違いフローの実行スレッドとは異なるスレッドから実行されることに注意してください。
強制終了時のキャンセル実装の詳細については後述のキャンセル処理の実装の章を参照してください。
各メソッドがどういうタイミングで実行されるかはそのメソッド内でログを出してみると良くわかります。
sampleフォルダにコンポーネントの各メソッドやトランザクションの中でログを出力するだけのサンプル(MethodLogコンポーネント)があるので、
詳細な実行タイミングを確認したい場合はそちらを動作させてみてください。
MethodLogコンポーネントのソース
MethodLogコンポーネントの定義ファイル
ExecuteContextはフローの実行コンテキストを表すクラスです。
実行コンテキストはリクエストの開始から終了まで引き回され、コンポーネントやマッパー関数の各種コールバックメソッドにはすべてこのインスタンスが引数として渡されます。
このクラスを用いることで
を行うことができます。
本製品ではログ出力のライブラリとしてApache Jakarta Projectの Log4Jを使用しています。
ExecuteContextの各種ログ出力メソッドはLog4Jのラッパーとなっています。
ExecuteContextには以下のログ出力用のメソッドがあり、それぞれのメソッドに文字列を渡した場合のログの出力形式(Log4Jのパターン"%m"に対応)は以下のようになります。
メソッド | 出力形式 |
---|---|
debug | メッセージコード: [リクエストID] デバッグ(コンポーネント名) :メッセージ |
debugInfo | メッセージコード: [リクエストID] デバッグ情報(コンポーネント名) :メッセージ |
info | メッセージコード: [リクエストID] 情報(コンポーネント名) :メッセージ |
warn | メッセージコード: [リクエストID] 警告(コンポーネント名) :メッセージ |
error | メッセージコード: [リクエストID] エラー(コンポーネント名) :メッセージ |
fatal | メッセージコード: [リクエストID] 致命的エラー(コンポーネント名) :メッセージ |
debugInfo以外の各メソッドはLog4Jのlevelに対応しています。
debugInfoはフローの実行がnormalモードで行われている場合はdebugレベルで、それ以外のモードで実行されている場合は infoレベルとして出力されます。
つまりdebugInfoで出力したメッセージはフローがデバッグモードで実行されている場合のみ出力されることになるので、コンポーネントからデバッグ情報を出力する場合はこのメソッドを使用してください。
(正確に言うとログ出力がどのように制御されるかはFSMCのログ設定によっても変わってくるのですが、コンポーネント開発者にとってはこのように理解していれば十分です。)
フローの実行ログのカテゴリは
asteria.flow.プロジェクトオーナー名.プロジェクト名.フロー名
というカテゴリになります。(プロジェクトオーナー名はドメインを含むフルネームで、ドメインは「.」で区切られます。)
つまり、フローごとにログの出力先を変えたり、ログ出力レベルを変更したりすることができます。
各種変数は以下のメソッドで取得することができます。
変数 | メソッド | クラス |
---|---|---|
フロー変数 | getFlowVariables | VariableList |
システム変数 | getSystemVariables | SystemVariables |
外部変数セット | getExternalVariables | ExternalVariables |
注意
VariableListやそれが保持しているValueクラスには値の設定メソッドがありますが、コンポーネントの中でフロー変数などに直接値を設定してはいけません。
Mapper以外のコンポーネントが値を設定してしまうと、フロー開発者がそれをトレースできなくなります。
関連する複数のコンポーネントをひとつのフローの中で使用する場合、それらのコンポーネントで同じオブジェクトを共有したい場合があります。
getIndependentMapメソッドによってリクエストの実行中に引き回されるMapが取得できるので、そこにオブジェクトをputすることで複数のコンポーネントから同じオブジェクトに
アクセスできるようになります。
IndependentMapの使用方法の詳細についてはIndependentMapの使用の章を参照してください。
それぞれ getProjectOwnerメソッドとgetUserメソッドで取得することができます。
getProjectOwnerがnullを返すことはありませんが、getUserは実行ユーザーが存在しない場合にnullを返します。
セッション(FlowSession)はgetSessionメソッドで取得することができます。
フローの実行時には必ずセッションがExecuteContextに関連付けられており、明示的に終了するかタイムアウトするまでサーバー側で保持されます。
同一セッションでリクエストを実行した場合にはセッションを介して複数のリクエスト間でなにかしらのオブジェクトを共有することが可能です。
(外部変数セットのセッション変数がこれにあたります。)
またセッションは内部にMapを保持しておりこのMapにはSDKユーザーが独自に値を設定することが可能です。(get/put/removeメソッド)
つまりコンポーネントの実行コードからセッションになにかしらのオブジェクトを設定し、それを同一セッションの別リクエストで取り出すような処理が可能です。
セッションに値を設定する場合は重複を避けるためにJavaのパッケージ名と同様にドメイン名を付加した名前をキーとして使用してください。
設定したオブジェクトがセッション終了時やタイムアウト時にファイナライズ処理を必要とする場合はSessionListenerを追加してください。
getConnectionメソッドによってコネクションを取得することができます。
getConnectionメソッドには
の二つがあります。(実際には種別と名前を指定するメソッドは内部的にConnectionPropertyを生成して後のメソッドを実行しています。) 指定したコネクションが存在しない場合はExceptionとなります。
取得するコネクションが既にフロー内で使用されている場合は、これらのメソッドで取得できるコネクションのインスタンスは先に使用されたコネクションと同じになります。
例えば、作成するコンポーネントがRDBコネクションを使用し、「TEST1」というRDBConnectionを取得しようとした場合に、それ以前にRDBGet/RDBPutなどのコンポーネント
が「TEST1」というコネクションを使用していた場合は取得されるコネクションは先に使用されたものと同じインスタンスになります。
コネクションの使用方法の詳細についてはコネクションの使用の章を参照してください。
コンポーネントがトランザクションをサポートする場合はexecuteまたはexecuteLoopメソッド内で
Transactionインターフェース(またはそれを拡張したExtendedTransactinインターフェース)を実装したクラスを作成して、ExecuteContext#addTransactionメソッドに渡します。
追加されたTransactionはTransactionManagerによって適切なタイミングで実行されます。
トランザクションの使用方法の詳細についてはトランザクションの章を参照してください。
コンポーネント開発のとっかかりとして、まずは何も処理を行わず入力ストリームをそのまま出力するだけのコンポーネントのソースを以下に示します。
このコンポーネントは入力コネクタ、出力コネクタともに一つだけ持つコンポーネントなのでSimpleComponentのサブクラスとして作成します。
import com.infoteria.asteria.flowengine2.execute.ExecuteContext; import com.infoteria.asteria.flowengine2.flow.InputConnector; import com.infoteria.asteria.flowlibrary2.FlowException; import com.infoteria.asteria.flowlibrary2.component.SimpleComponent; import com.infoteria.asteria.flowlibrary2.stream.StreamType; public class DoNothingComponent extends SimpleComponent { public static final String COMPONENT_NAME = "DoNothing"; public String getComponentName() { return COMPONENT_NAME;} //1 public DoNothingComponent() { //2 getInputConnector().setAcceptLinkCount(1); //3 //getInputConnector().setAcceptContainer(true); //4 //getInputConnector().setExpandContainer(false); //5 getInputConnector().setAcceptType(StreamType.ALL); //6 getOutputConnector().setAcceptType(StreamType.ALL); //7 } public boolean execute(ExecuteContext context) throws FlowException {//8 passStream(); //9 return true; //10 } }
数字のコメントがついている個所ではそれぞれ以下のことを行っています。
コンポーネント名を返すメソッド
このメソッドはabstractメソッドなので必ず実装する必要があります。
ここで返されるコンポーネント名はサーバー上に登録されるすべてのコンポーネント内でユニークでなければなりません。
コンポーネント名には「<会社名>.」というプレフィクスをつけるなどの方法でユニーク性を確保してください。
原則としてAsteria以外の会社が「.」を含まない名前をコンポーネント名に使用することは禁止です。
その上で定義ファイル上では「displayName」を設定してできるだけわかりやすい名前をつけるようにしてください。
ここで定義したコンポーネント名と定義ファイルのComponent/@name属性の設定値は同じにしなければなりません。
コンポーネントには引数なしのコンストラクタが必要です。
入力コネクタが受け入れることのできるストリームの最大値を設定しています。
多くのコンポーネントは受け入れ可能なストリーム数は1です。
無制限にストリームを受け入れることができるコンポーネントの場合は
getInputConnector().setAcceptLinkCount(InputConnector.LINK_UNBOUNDED);
のように InputConnector#LINK_UNBOUNDEDというシンボルを使用して値を設定します。
コネクタがストリームコンテナを受け入れるかどうかを設定しています。(ストリームコンテナについては後述します。)
コンポーネントがストリームコンテナを受け入れない場合に、ストリームコンテナが流れてきた場合はExceptionが発生します。
この設定のデフォルト値はtrueなのでソース中ではコメントアウトされています。
コネクタがストリームコンテナを受け入れた時にそれを展開するかどうかを設定しています。
複数ストリームを受け入れ可能なコネクタではgetStreamArrayメソッドを使用することで、入力ストリームを配列として取得することができます。
この時にこの設定値がtrueであれば、コンテナ内のストリームがすべて展開されて配列の要素となります。
falseの場合はストリームコンテナ自体が配列の1要素となります。
この設定のデフォルト値はfalseなのでソース中ではコメントアウトされています。
入力コネクタが受け入れ可能なストリームフォーマットを指定しています。
設定値にはStreamTypeクラスで宣言されているシンボルを使用します。
例えばTextとHTMLのストリームだけを受け入れる場合は
getInputConnector().setAcceptType(StreamType.TEXT|StreamType.HTML);
のように設定します。
すべてのストリームフォーマットを受け入れる場合はStreamType.ALLを使用します。
受け入れ不可のストリームが流れてきた場合は、Exceptionが発生します。
出力コネクタが受け入れ可能なストリームフォーマットを指定しています。
指定方法は入力コネクタの場合と同じです。
3〜7の入出力コネクタの設定はコンポーネント作成の際には必ず設定する必要のある項目です。
コンストラクタを作成したら最初に設定するようにして下さい。
executeメソッド
コンポーネントが実際に行う処理を記述するメソッドです。
このメソッドはabstractメソッドなので必ず実装する必要があります。
passStreamメソッドを使用すると入力ストリームがそのまま出力ストリームとしてセットされます。
コンポーネント内でストリームを作成して出力する場合は
StreamDataObject os = ...;//出力ストリームを作成する。(後述) setOutputStream(os);
のように作成したストリームをSimpleComponent#setOutputStreamメソッドで設定します。
このコンポーネントはループの起点とならないので常にtrueを返しています。
このコンポーネントのソースコードと定義ファイルはsampleフォルダにもあります。
DoNothingコンポーネントのソース
DoNothingコンポーネントの定義ファイル
このコンポーネントは何もしないのでコンパイルして動かしてみても何も起きません。
ここに実際に処理を行うコードやプロパティを追加していくことで実用的なコンポーネントを作っていくことができます。
コンポーネント開発の上でもうひとつ重要な要素としてプロパティがあります。
プロパティとはコンポーネントの振る舞いを決めるためにフロー開発者がデザイナー上で値を設定するものの総称です。
通常は設計時にデザイナー上で値を設定しますが、マッパーやプロパティ式を用いることで動的に値を設定することもできます。
またExcelInputコンポーネントなどのようにインスペクタ上には表示されない隠しプロパティを持ち、デザイナーに組み込まれた拡張UIによって値を設定しているものもあります。
逆から言えばコンポーネント開発者は自作コンポーネントの機能のうちフロー開発者に値を設定させたいものをプロパティにします。
プロパティはJavaソースコード上ではコンストラクタで登録し、定義ファイル上ではProperty要素として定義します。
サーバー側に標準で組み込まれているプロパティクラスの一部のクラス図を以下に示します。
実装的にはプロパティとはPropertyインターフェースを実装したクラスのことです。
コンポーネント開発者はこれらのインスタンスを生成して、自作コンポーネントに登録します。
図ではPropertyインターフェースの直接の実装クラスとして、ValueProperty、CategoryProperty、ConnectionPropertyの3つがありますが、以下にそれぞれの概要を説明します。
ValuePropertyはインスペクタ上でプロパティ名と値が1対1で対応するプロパティの抽象基底クラスです。
つまりインスペクタの基本タブに1行1プロパティで表示されそこで値が設定されます。
もちろんマッパーによる値のマッピングやプロパティ式での設定も可能です。
設定値のデータ型により、StringProperty、IntegerPropertyなどの具象クラスがあります。
EnumPropertyは列挙値だけを値として許可するプロパティです。列挙値として指定された以外の値が設定された場合はエラーとなります。
StringPropertyからさらに派生したプロパティは、なんらかの付加機能があるプロパティです。
例えばRegExpPropertyにはプロパティ値として正規表現が設定されるのでそれに文字列がマッチするかどうかを判断するmatchメソッドが加えられています。
またPathResolverPropertyにはプロパティ値としてファイルパスが設定されるので、それが相対パスであろうと絶対パスであろうと適切に対応するFileオブジェクトを取得する
getFileメソッドがあります。
CategoryPropertyはインスペクタ上でタブページとして表示されるプロパティに対応します。
これは名前と値のセットを定義するためのプロパティであり、値はマッピングによって設定することも可能です。
列項目として名前、データ型、デフォルト値の3つを持つものがSimpleCategoryPropertyで、 それ以上の項目が必要な場合はExtendedCategoryPropertyを使用します。
ConnectionPropertyはFSMCやデザイナーで定義した各種コネクションを使用するためのプロパティです。
このプロパティをキーにしてExecuteContext#getConnectionメソッドを呼び出すことにより各種コネクションを取得できます。
ConnectionPropertyはマッピングによって値を設定することはできません。(マッピング可能なプロパティはValuePropertyとCategoryPropertyのサブクラスだけです。)
コネクションの使用方法の詳細についてはコネクションの使用の章を参照してください。
プロパティを定義するにはコンポーネントのコンストラクタ内で作成したPropertyをregistPropertyメソッドで登録します。
以下にいくつかの種類のプロパティを作成するコードを示します。
import com.infoteria.asteria.flowengine2.execute.ExecuteContext; import com.infoteria.asteria.flowengine2.flow.InputConnector; import com.infoteria.asteria.flowlibrary2.FlowException; import com.infoteria.asteria.flowlibrary2.component.SimpleComponent; import com.infoteria.asteria.flowlibrary2.stream.StreamType; import com.infoteria.asteria.flowlibrary2.property.*; import com.infoteria.asteria.connection.RDBConnection; import com.infoteria.asteria.connection.RDBConnectionEntry; import java.util.Iterator; import java.sql.Connection; import java.sql.SQLException; /** * Propertyのテストコンポーネント */ public class PropertyTestComponent extends SimpleComponent { public static final String COMPONENT_NAME = "PropertyTest"; public String getComponentName() { return COMPONENT_NAME;} private IntegerProperty propA = new IntegerProperty("A", true, true); private IntegerProperty propB = new IntegerProperty("B", true, true); private IntegerProperty propC = new IntegerProperty("C", false, true); private SimpleCategoryProperty propCategory = new SimpleCategoryProperty("MyCategory"); private StringProperty propLogMethod = new StringProperty("LogMethod", true, false); private ConnectionProperty propConnection = new ConnectionProperty(RDBConnectionEntry.TYPE, "Connection", false); private StringProperty propDriver = new StringProperty("DriverName", false, true); public PropertyTestComponent() { getInputConnector().setAcceptLinkCount(1); //getInputConnector().setAcceptContainer(true); //getInputConnector().setExpandContainer(false); getInputConnector().setAcceptType(StreamType.ALL); getOutputConnector().setAcceptType(StreamType.ALL); //プロパティの登録 registProperty(propA); registProperty(propB); registProperty(propC); registProperty(propCategory); registProperty(propLogMethod); registProperty(propConnection); registProperty(propDriver); } public boolean execute(ExecuteContext context) throws FlowException { //プロパティ「A」と「B」に値を設定してその和をプロパティ「C」に設定する。 int a = propA.intValue(); int b = propB.intValue(); propC.setValue(a + b); //プロパティ「MyCategory」に設定したNameと値をプロパティ「LogMethod」で指定されたメソッドでログに出力する。 Iterator it = propCategory.keySet().iterator(); while (it.hasNext()) { String name = (String)it.next(); String msg = name + " = " + propCategory.getValue(name); outputLog(context, msg); } //プロパティ「Connection」に設定されたRDBConnectionのドライバー情報をプロパティ「DriverName」に設定する。 if ( !propConnection.isNull()) { RDBConnection rcon = (RDBConnection)context.getConnection(propConnection); Connection con = rcon.getConnection();//ここで取得したjava.sql.Connectionは自由に使用することができる。 try { propDriver.setValue(con.getMetaData().getDriverName()); } catch (SQLException e) { //例外が発生した場合はそのメッセージをプロパティに設定する propDriver.setValue(e.toString()); } } passStream(); return true; } private void outputLog(ExecuteContext context, String msg) { String method = propLogMethod.strValue(); if (method.equals("fatal")) context.fatal(msg); else if (method.equals("error")) context.error(msg); else if (method.equals("warn")) context.warn(msg); else if (method.equals("info")) context.info(msg); else if (method.equals("debugInfo")) context.debugInfo(msg); else if (method.equals("debug")) context.debug(msg); } }
executeメソッド内で行っている処理の内容には特に意味はありません。
ここでおさえてほしい内容は以下の事柄です。
次にこのコンポーネントの定義ファイルを示します。
<?xml version="1.0" encoding="utf-8"?> <ComponentDefine version="4.0" xmlns="http://www.infoteria.com/asteria/flowengine/definition"> <xsc lang="en"> <Component category="Sample" icon="" name="PropertyTest" toolTip="PropertyTest"> <Class>PropertyTestComponent</Class> <Property name="Exception" mapping="false" toolTip="" type="exception"/> <Property name="A" required="true" toolTip="A" type="int">0</Property> <Property name="B" required="true" toolTip="B" type="int">0</Property> <Property name="C" readonly="true" mapping="input" toolTip="A+B" type="int">0</Property> <Property name="LogMethod" choiceItem="fatal
error
warn
info
debugInfo
debug" mapping="false" required="true" toolTip="Log method" type="choice">info</Property> <Property name="Connection" connection="RDBConnection" mapping="false" toolTip="RDBConnection" type="connection"/> <Property name="DriverName" readonly="true" mapping="input" toolTip="DriverName" type="string"/> <Category key="Name" name="MyCategory"> <Property name="Name" toolTip="Name" type="string"/> <Property name="Default" toolTip="Default value" type="string"/> </Category> <Input accept="ALL"/> <Output streamPassThrough="true"/> </Component> </xsc> <xsc lang="ja"> <Component category="Sample" icon="" name="PropertyTest" toolTip="プロパティテスト"> <Class>PropertyTestComponent</Class> <Property name="Exception" displayName="汎用" mapping="false" toolTip="" type="exception"/> <Property name="A" displayName="値A" required="true" toolTip="A" type="int">0</Property> <Property name="B" displayName="値B" required="true" toolTip="B" type="int">0</Property> <Property name="C" displayName="値C" readonly="true" mapping="input" toolTip="A+B" type="int">0</Property> <Property name="LogMethod" displayName="ログメソッド" choiceItem="fatal
error
warn
info
debugInfo
debug" mapping="false" required="true" toolTip="Log method" type="choice">info</Property> <Property name="Connection" displayName="コネクション名" connection="RDBConnection" mapping="false" toolTip="RDBConnection" type="connection"/> <Property name="DriverName" displayName="ドライバー名" readonly="true" mapping="input" toolTip="DriverName" type="string"/> <Category key="Name" name="MyCategory" displayName="カテゴリー" > <Property name="Name" displayName="名前" toolTip="Name" type="string"/> <Property name="Default" displayName="値" toolTip="Default value" type="string"/> </Category> <Input accept="ALL"/> <Output streamPassThrough="true"/> </Component> </xsc> </ComponentDefine>
定義の詳細については定義ファイルリファレンスを参照してください。
ここでおさえてほしい内容は以下の事柄です。
ここではでてきませんが、SimplePropertyControllerというデザイナーのプラグインを使うことでプロパティの表示/非表示をコントロールすることも可能です。
例えばHTTPGetコンポーネントでは「コネクションを使用」というプロパティ値によってURLを指定するために使用する(=デザイナーで表示される)プロパティが異なります。
この場合でもサーバー側ではデザイナーの表示とは無関係に、単に「コネクションを使用」プロパティの値で処理を分岐するだけです。
整合性をとりながらフロー開発者が少しでもコンポーネントを使いやすくなるようにプロパティを設計してください。
ストリームとはフローを流れていくデータのことです。
実装的にはストリームはStreamDataObjectというクラスのサブクラスであり以下の10種類があります。
種別 | クラス | 説明 |
---|---|---|
Binary | StreamDataBinary | バイナリーデータ |
Text | StreamDataText | テキストデータ |
HTML | StreamDataHTML | HTMLデータ |
MIME | StreamDataMIME | MIMEデータ |
Record | StreamDataRecord | 行と列を持つレコード形式データ |
CSV | StreamDataCSV | CSVデータ。行と列を持つレコード形式データでもある |
FixedLength | StreamDataFixedString | 固定長データ。行と列を持つレコード形式データでもある |
ParameterList | StreamDataParameterList | 主にHTMLのフォームやSOAPのパラメーターを扱うためのデータ |
XML | StreamDataXML | XMLデータ。DOMで扱うことができ、また繰り返し構造をレコード形式で扱うこともできる |
JSON | StreamDataJSON | JSONデータ。JSONObjectで扱うことができ、また繰り返し構造をレコード形式で扱うこともできる |
また、これらのストリームをまとめて扱うための枠組みとしてコンテナ(StreamDataContainer)があります。
ストリームを扱うコンポーネントを作成する場合、次のことを決める必要があります。
多くの場合、コンポーネントで扱うストリームはひとつだけであり複数の入力ストリーム(あるいはコンテナ)を検討しなければならないケースはあまりありません。
(例えばXSLTコンポーネントは入力としてひとつのXMLストリームを受け入れ、それを変換して出力ストリームを作成します。)
しかしメールコンポーネントの添付ファイルのように複数ストリーム、あるいはコンテナでなければ素直に実装できない処理もなかには存在します。
どのようなケースでコンテナを検討すべきかはまた別途説明します。
ストリーム関連のクラスのクラス図を以下に示します。
ストリームの操作で使用するほとんどのメソッドは基底クラスであるStreamDataObjectで定義されています。
各具象クラスではストリームプロパティに対するGetterメソッドがありますが、コンポーネント開発者がストリームプロパティを
意識しなければならないケースはほとんどありません。(例えば生成するTextストリームのエンコーディングを何にするか?などは
フロー開発者が意識するべきことでコンポーネント開発の本質には関係ありません。)
これ以外にStreamDataXMLにはDOMのDocumentを取得するためのgetDocumentメソッドがあり、 StreamDataParameterListにはパラメーター値を設定/取得するためのメソッドがあります。 また、StreamDataJSONには構造化されたJSONObjectを取得するためのgetJsonメソッドがあります。
StreamDataRecordにはさらにサブクラスとしてデータをListで扱うためのクラスとJDBCのResultSetをラップしたクラスがありますが、 通常コンポーネント開発者がこれらのクラスを意識することはありません。
入力ストリームはInputConnectorから取得します。
単一ストリームのみを受け入れるコンポーネントではInputConnector#getStreamメソッドを使用してStreamDataObjectを取得し、
複数ストリームの受け入れが可能なコンポーネントではInputConnector#getStreamArrayメソッドで、StreamDataObjectの配列を取得します。
コンテナを扱うコンポーネントの場合はInputConnector#setExpandContainerを設定しておくことで、コンテナをgetStreamArrayメソッドでばらして取得することもできます。
この場合コンテナは複数の入力ストリームが差し込まれた場合と同様にStreamDataObjectの配列に展開されるので、この時に取得されるStreamDataObjectがStreamDataContainerのインスタンスとなることはありません。
例えばメールコンポーネントはサブコネクタで複数ストリームもコンテナも扱うことのできるコンポーネントですが、どちらの場合もそれらをすべて添付ファイルとするので、 最初にExpandContainerを設定しておけばコンポーネント内部の処理ではコンテナを意識する必要はなくなります。
StreamDataObjectにはbyteValue, strValue, getDataInputStream, getDataStringReader, getRecordという5つのメソッドがあり、それぞれストリームのバイナリ値、文字列値、レコードを取得することができます。
これ以外にStreamDataXMLにはDOMのDocumentを取得するためのgetDocumentメソッドがあり、 StreamDataJSONにはJSONObjectを取得するためのgetJsonメソッドがあり、 StreamDataParameterListにはパラメーター値を設定/取得するためのメソッドがあります。
コンポーネントはこれらのメソッドを使用して入力ストリームのデータにアクセスしながら処理を行います。
種別 | メソッド | 取得されるオブジェクト |
---|---|---|
すべて | byteValue | byte[] |
getDataInputStream | java.io.InputStream | |
strValue | java.lang.String | |
getDataStringReader | java.io.Reader | |
getRecord | com.infoteria.asteria.flowlibrary2.stream.Record | |
XML | getDocument | org.w3c.dom.Document |
JSON | getJson | com.infoteria.asteria.util.json.JSONObject |
ParemterList | getValue | Value |
getValueArray | Value[] | |
getObjectArray | Object[] |
StreamDataParameterListのgetValueArray, getObjectArrayメソッドはフィールドが配列の場合に使用するメソッドです。
歴史的な経緯からValueの配列を返すメソッドとObjectの配列を返すメソッドのふたつがありますが、通常はgetValueArrayメソッドを使用します。
それぞれのストリームで、byteValue、strValue、getDataInputStream, getDataStringReader, getRecordメソッドが返す内容は以下のようになります。
種別 | byteValue/getDataInputStream | strValue/getDataStringReader | getRecord |
---|---|---|---|
Binary | バイナリーデータそのもの | バイナリーデータの16進ダンプ | 「Object」というフィールド名でBinary型の1行1列のレコード |
Text | テキストデータをエンコーディングでバイト列化したもの | テキストデータそのもの | 「Object」というフィールド名でString型の1行1列のレコード |
HTML | HTMLデータをエンコーディングでバイト列化したもの | HTMLデータそのもの | 「Object」というフィールド名でString型の1行1列のレコード |
MIME | MIMEデータのバイト列 | MIMEデータをヘッダのcharsetで文字列化したもの | 「Object」というフィールド名でBinary型の1行1列のレコード |
Record | RecordのXML表現 | RecordのXML表現 | フィールド定義に沿ったレコード |
CSV | CSVデータをエンコーディングでバイト列化したもの | CSVデータそのもの | フィールド定義に沿ったレコード |
FixedLength | 固定長データそのもの | 固定長データをエンコーディングで文字列化したもの | フィールド定義に沿ったレコード |
ParameterList | ParameterListのXML表現 | ParameterListのXML表現 | フィールド定義に沿い、配列フィールドの長さだけ行を持つレコード |
XML | XMLそのもの | XMLそのもの | フィールド定義に沿ってXMLの繰り返しをレコード化したもの |
JSON | JSONそのもの | JSONそのもの | フィールド定義に沿ってJSONの繰り返しをレコード化したもの |
コンテナ | コンテナ内の各ストリームのbyteValueを連結したもの | コンテナ内の各ストリームのstrValueを連結したもの | コンテナ内の各ストリームのレコードすべて |
グレイになっているメソッドは通常は使用しないメソッドです。
RecordやParameterListのbyteValueやstrValueは便宜的にXMLを返していますが、通常これらの値を使用することはありません。
また、コンテナではgetRecordメソッドは各ストリームのレコードに透過的にアクセスできます。
例えばコンテナの中身がそれぞれ3行、5行、7行のデータを持つCSVである場合、15行のデータを持つひとつのストリームと同じように扱うことができます。
コンテナのbyteValue/getDataInputStream、strValue/getDataStringReaderは各ストリームの値を連結した値を返していますがコンテナを扱うコンポーネントはほとんどの場合、
のいずれかなのでほとんど使用されることはありません。
入力ストリームを扱う際の注意
これらのメソッドで取得されるバイト列やRecord、StreamDataXMLのDocumentなどに対して直接変更を加えてはいけません。
これらのメソッドの多くはデータのコピーではなく、参照を返します。
もしも自作コンポーネントが入力ストリームのデータを直接変更してしまった場合、
左図のようなパラレルのあるフローでは、自作コンポーネントの行った変更がVelocityコンポーネントの方で参照されることになります。
つまりVelocityコンポーネントで取得される入力ストリームがFileGetコンポーネントが出力した内容と異なるという結果になるので、このような操作は行ってはいけません。
ストリームのRecordインターフェースはStreamDataObject#getRecordメソッドにより取得されます。
ストリームがレコードを持っていない場合はこのメソッドはnullを返します。(フィールド定義を持たないXMLストリームなどがこれにあたります。)
ここで取得されたRecordにどのようなフィールドがあるかという情報を保持しているのはFieldDefinitionというクラスであり、
それはStreamDataObject#getFieldDefinitionというメソッドで取得できます。
ストリームがフィールド定義を持っていない場合はこのメソッドはnullを返します。
Recordインターフェースはストリーム内の各レコードのイテレータになっていて、getRecordメソッドで取得された時点でそのポインタは先頭レコードを指しています。
通常はnextやnextRecordメソッドを使用して1レコードずつポインタをずらしながら各レコードにアクセスします。
Recordを最後までイテレートできなかった場合は、closeResourcesメソッドを使用して、Recordの内部リソースを解放する必要があります。
典型的なレコードインターフェースの使用方法は以下のようになります。
StreamDataObject is; Record record = is.getRecord(); try { while (record != null) { Value v1 = record.getValue("Field1"); Value v2 = record.getValue("Field2"); //ここで何かv1, v2を用いた処理を行う record = record.nextRecord(); } } finally { if (record != null) { record.closeResources(); } }
Recordインターフェースにはnext以外にもabsoluteなどレコード番号を指定して各レコードにアクセスするメソッドもありますが、これらのメソッドはストリームの 種類によってはパフォーマンスが悪いことがあるので、可能な限りnextを使用してレコードを先頭からなめるようなアクセス方法で使用してください。
Record#getValueの注意
ストリームの種類によってはgetValueメソッドの返すValueオブジェクトのインスタンスがRecordをイテレートしても同じインスタンスを返すものがあります。
つまり次のようなコードは意図したとおりに動かない可能性があります。
StreamDataObject is; Record record = is.getRecord(); try { Value preValue = null; while (record != null) { Value v1 = record.getValue("Field1"); Value v2 = record.getValue("Field2"); if (preValue != null && preValue.equals(v1)) { //Field1の値が直前のレコードと同じだった場合に何かする } preValue = v1; record = record.nextRecord(); } } finally { if (record != null) { record.closeResources(); } }
この場合ストリームの種類によってはpreValueとv1が同じインスタンスになるので意図したとおりには動作しません。
(アクセスするフィールドが異なる場合は返されるValueインスタンスは同じにはなりません。上の例ではv1とv2が同じインスタンスになることはありません。)
上のようなことを行う場合、以下のように書きます。
StreamDataObject is; Record record = is.getRecord(); try { String preValue = null; while (record != null) { Value v1 = record.getValue("Field1"); Value v2 = record.getValue("Field2"); if (preValue != null && preValue.equals(v1.strValue())) { //Field1の値が直前のレコードと同じだった場合に何かする } preValue = v1.strValue();//Value#strValue()はnullを返すことはありません。 record = record.nextRecord(); } } finally { if (record != null) { record.closeResources(); } }
入力ストリームがコンテナ(StreamDataContainer)だった場合にそれがどのように扱われるかはInputConnector#setAcceptContainerとInputConnector#setExpandContainerの設定値によって決まります。
true | false | |
---|---|---|
AcceptContainer | コンテナを受け入れる | 入力ストリームにコンテナがあった場合は Exceptionとなる |
ExpandContainer | InputConnector#getInputStreamArrayを使用した時に コンテナは展開されて取得される | コンテナはコンテナのまま取得される |
コンポーネントがコンテナをどのように扱うかはそのコンポーネントの設計によりますが、可能な限り受け入れるようにしておいた方がフロー開発者の利便性はあがります。
どのような仕様とするかはコンポーネント開発者の自由ですが、ある程度の指針を以下に示します。
ストリーム自身の属性はデザイナー上ではストリームプロパティとして定義されます。
通常コンポーネント開発者はこれらのストリームプロパティを意識する必要はありません。
何故ならCSVの区切り文字が何であるかとかXMLのエンコーディングがなんであるかなどはコンポーネント内で行う処理にはほとんどの場合無関係だからです。
デザイナーで設定されたストリームプロパティを保持しているのはStreamFactoryというクラスです。
StreamFactoryはOutputConnectorからgetStreamFactoryメソッドで取得することができます。
ただし、入力ストリームをそのまま出力するコンポーネント - つまり定義ファイルで「Output/@streamPassThrough="true"」と定義されたOutputConnectorでは
このメソッドはnullを返します。
ストリームを作成する場合はこのStreamFactoryのcreateメソッドにストリーム内容のデータを渡します。
StreamFactoryは作成するストリーム型ごとにサブクラスが用意されており、型によっては引数の異なるcreateメソッドが追加されています。
ここで作成されたStreamDataObjectにはストリームプロパティがすべて適用されています。
(StreamDataObjectにはストリームプロパティに対するGetterメソッドはあってもSetterメソッドはありません。)
StreamFactoryのクラス図を以下に示します。
基底クラスであるStreamFactoryではbyte[]を引数とするcreateメソッドがあります。
InputStreamやFileを引数とするcreateメソッドもあります。これらのメソッドは大容量ストリームを使用する場合に特に重要です。大容量ストリームでない普通のストリームでは、これらのメソッドは最終的にはbyte[]を引数とするcreateメソッドを呼び出しているだけです。
createEmptyStreamは空のストリームを作成するためのメソッドです。
以下にそれぞれのストリームを作成する場合にどのようなデータを引数として渡せば良いかを説明します。
StreamDataBinaryはbyte[]を引数として作成します。
ここで引数となるbyte[]はどのようなバイト列であっても構いません。
StreamDataTextは通常はStringを引数として作成します。
byte[]を引数とする場合はそれはストリームプロパティに指定されたエンコーディングでなければなりません。
(あるいは第2引数でエンコーディングを明示します。)
StreamDataHTMLの作成方法はStreamDataTextとすべて同じです。
引数となるデータはHTMLであるべきですが、実際にはデータがHTMLであるかどうかの検証は一切行われていません。
StreamDataTextとStreamDataHTMLの違いは、MIMEに変換した場合(ブラウザから実行するフローで結果ストリームとして返された場合)に Content-Typeが「text/plain」となるか「text/html」となるかだけです。
StreamDataMIMEの作成ではbyte[]とMIMEType(StreamDataMIME.Type)を引数として指定します。
引数となるbyte[]はMIMEとして正当なバイト列である必要があります。
MIMETypeには「HTTP」と「MIME」があり、そのMIMEをHTTPで使用する場合は「HTTP」をメールで使用する場合は「MIME」を指定します。
(MIMETypeが省略された場合は「HTTP」となります。)
実際にはエンドユーザーのレベルではMIMETypeについてほとんど気にする必要がありません。
StreamDataMIME#byteValueの返す値はMIMETypeの指定に関わらず、メソッドの引数となったbyte[]そのものです。
MIMETypeを使用するのはStreamDataMIMEが内部で使用するPartオブジェクトですが、Partオブジェクトの詳細は非公開です。
独自にMIMEをパースして扱う必要がある場合はJavaMailなどの外部ライブラリをご利用ください。
StreamDataCSVの作成方法にはStreamDataTextと同じ作成方法の他に、java.util.Listを引数とするメソッドがあります。
このメソッドを使用する場合、Listの内容はフィールド定義で定義されたフィールド数と同じ要素数を持つString[]のリストとなっていなければなりません。
(Listの各アイテムが1行分のレコードデータとなります。)
またそこで使用するListはSerializableなクラスでなければなりません。(例えばArrayList#subListはSerializableでないListを返すので使用してはいけません。)
Stringやbyte[]を引数とする場合はその内容はストリームプロパティで指定された区切り文字で区切られたCSVデータでなければなりませんが、 囲み文字については指定されている場合でも各フィールドが必ずしもその囲み文字で括られている必要はありません。
StreamDataFixedStringの作成方法にはbyte[]、String、Listをそれぞれ引数とする3つがあります。
byte[]を引数とする場合はそこに渡されるデータは完全にストリームプロパティ、およびフィールド定義に沿った内容でなければなりません。
Listを引数とするメソッドを使用する場合、Listの内容はフィールド定義の内容に沿ったValue[]のリストとなっていなければなりません。
またそこで使用するListはSerializableなクラスでなければならないのはCSVの場合と同じです。
Stringを引数とする場合は、ストリームプロパティで指定されたエンコーディングでbyte[]に変換されてからパースされます。
(実質的にはStringを引数とするメソッドが使用されることはほとんどありません。)
StreamDataXMLの作成方法にはbyte[]、String、org.w3c.dom.Documentをそれぞれ引数とする3つがあります。
Stringまたはbyte[]を引数とする場合はその内容はXMLデータでなければなりません。
Documentを引数としてStreamDataXMLを作成する場合、そのDocumentはNamespace awareなDocumentでなければなりません。
本製品ではDOMのライブラリとしてXercesを使用していますが、Xercesでは Document#createElementを使用して
作成されたElementのgetLocalNameメソッドはnullを返します。
これはDOMの実装としては正しいですが、StreamDataXMLの引数となるDocumentではElementとAttributeは、getLocalNameメソッド
でnullを返してはいけません。
Documentの生成には原則としてDOMUtilのメソッドを使用し、コード中で要素や属性を生成する場合は名前空間なしの場合でもcreateElementNSなどのNamespaceAPIを使用してください。
※ DOMUtilはJAXPをラップしたDOMを扱うためのユーティリティクラスです。内部的にはXercesを使用しています。
StreamDataJSONの作成方法にはbyte[]、String、com.infoteria.asteria.util.json.JSONObjectをそれぞれ引数とする3つがあります。
Stringまたはbyte[]を引数とする場合はその内容はJSONデータでなければなりません。
JSONObjectを引数とする場合、そのJSONObjectは"root"という名前のキーだけを持っている必要がありますが、"root"はbyteValue()やstrValue()の出力には含まれません。
JSONはXMLと違ってトップレベルから配列になることがありえるので、仮のプレースホルダとして"root"を置いています。
StreamDataRecordを作成する場合は通常はListを引数とするメソッドを使用します。
Listの内容はフィールド定義の内容に沿ったValue[]のリストとなっていなければなりません。
またそこで使用するListはSerializableなクラスでなければならないのはCSVやFixedLengthの場合と同じです。
byte[]を引数とするメソッドを使用するメソッドにStreamDataRecord#byteValueの出力内容を渡せば、その内容と同じRecordストリームが作成されますが、 Recordストリームのバイナリ値は主としてデバッグ用途のためだけに用意されているものなので、基本的にはこのメソッドを使用してはいけません。
StreamDataParameterListを作成する場合は引数無しのcreateメソッドを使用します。
つまりデータの内容をコンストラクト時に指定することはできないので、StreamDataParemterListのインスタンス作成後にそのSetterメソッドを使用して値を設定していきます。
byte[]を引数とするメソッドを使用するべきではないのはStreamDataRecordの場合と同じです。
大容量ストリームはメモリの使用量を抑えて大量のデータを処理することができるストリームです。これは、データをメモリに保持するのではなく、一時ファイルに保存することで実現しています。
各開始コンポーネントで「大容量ストリームを使用」プロパティを「はい」に設定すると大容量ストリームが使用されます。このプロパティの値はExecuteContext#useHighCapacityStreamsで取得することができます。ただし、通常はこのプロパティの値を使用することはありません。
このプロパティが「はい」に設定されていると、大容量ストリームをサポートしている次のストリームに関しては、StreamFactoryは自動的に大容量ストリームを作成します。
大容量ストリームを使用するコンポーネントを開発する場合、特に、コンポーネント内で大量のデータを使用することが想定される場合、大量のメモリを消費しないようにコンポーネントを実装する必要があります。
まず、コンポーネントの入力ストリームから大量のデータを取得して処理しなければならない場合、StreamDataObjectの次のメソッドを使用しないようにします。
これらのメソッドは、ストリームのデータ全体をbyte[]またはStringとして取得するため、ストリームのデータ全体がメモリに読み込まれ、メモリを大量に消費する要因となります。
その代わりに、次のメソッドを使用しストリームのデータを少しずつ処理するようにします。
同様に、コンポーネントの出力ストリームを作成する場合も、メモリを消費しないように大容量ストリーム用のメソッドを使用します。
FileやInputStreamからストリームを作成する場合は、次のメソッドを使用します。
それ以外の場合は、StreamFactoryの次のメソッドを使用します。
まず最初に、initializeでStreamFactoryを初期化をします。
次に、ストリームタイプに応じたaddを使用してデータを少しずつ追加します。例えば、Recordストリームの場合は、add(Value[])を使用して、出力するレコードを1レコードずつ追加します。
それ以外にも、StreamFactoryに直接データを書き込むためにgetFactoryOutputStreamを使用することができます。
出力するデータの追加や書き込みが終わったら、createを呼び出して最終的なストリームを作成します。
最終的なストリームを作成したら、terminateを呼び出してStreamFactory内部で使用しているリソースを開放します。
次の例は、入力ストリームからレコードを取得し、何らかの処理をした後に1レコードずつ出力ストリームに追加することで出力ストリームを作成する例です。
Record record = getInputConnector().getStream().getRecord(); StreamFactoryRecord outputStreamFactory = (StreamFactoryRecord)getOutputConnector().getStreamFactory(); outputStreamFactory.initialize(); try { Value[] resultOneRecordValues; while (record != null) { resultOneRecordValues = doSomeProcessing(record); outputStreamFactory.add(resultOneRecordValues); record = record.nextRecord(); } setOutputStream(outputStreamFactory.create()); } finally { outputStreamFactory.terminate(); if (record != null) { record.closeResources(); } }
次の例は、Binaryなどの入力ストリームからデータを少しずつ読み込んで、そのまま読み込んだデータを出力ストリームに書き込むことで出力ストリームを作成する例です。
StreamDataObject inputStreamObject = getInputConnector().getStream(); StreamFactoryBinary outputStreamFactory = (StreamFactoryBinary)getOutputConnector().getStreamFactory(); outputStreamFactory.initialize(); try (OutputStream os = outputStreamFactory.getFactoryOutputStream(); BufferedInputStream bis = new BufferedInputStream(inputStreamObject.getDataInputStream())) { byte[] data = new byte[8192]; int read; while ((read = bis.read(data)) > -1) { os.write(data, 0, read); } setOutputStream(outputStreamFactory.create()); } finally { outputStreamFactory.terminate(); }
出力ストリームとしてコンテナを出力する場合はStreamDataContainerを作成して、そこにストリームを追加していきます。
追加するストリームはすべてストリームプロパティとフィールド定義が同じでなければなりません。
言い換えれば同じStreamFactoryから作成されたストリームでなければなりません。
出力ストリームとしてコンテナを使用することがあるのはほとんどの場合、外部ストレージからデータを取得するコンポーネントです。
これらのコンポーネントでは取得対象のデータをコンテナ化して、出力するかループでひとつずつ出力するかを選択できるようになっています。
基本的な指針としては出力ストリームとしてコンテナを検討する場合はループも合わせて検討してください。
ストリーム変数とはストリームがそのデータ内容とは別に持つ属性情報です。
フロー開発者はマッパーで任意のストリームに対して自由にストリーム変数を付加することができます。
コンポーネント内でストリーム変数を取得/設定するには、StreamDataObject#getStreamVariable/putStreamVariableメソッドを使用します。
ストリーム変数のコンポーネントでの使われ方には次の2つがあります。
外部ストレージからデータを取得するコンポーネントの多くはそのデータ自身に関する情報をストリーム変数に設定します。
例えばFileGetコンポーネントのFilePath(ファイルのフルパス)、FileDate(ファイルのタイムスタンプ)、FileSize(ファイルサイズ)をストリーム変数に設定します。
あるいはPOP3コンポーネントのようにフロー開発者が自分の取得したいメールヘッダーをCategoryPropertyとして設定することによりそれがストリーム変数となるものもあります。
これらの値を出力専用のコンポーネントプロパティではなく、ストリーム変数として設定することには次のようなメリットがあります。
FileGetコンポーネントのように固定の変数名でストリーム変数を設定する場合は定義ファイルのOutput要素以下に付加する変数の定義を追加します。
<Output accept="Binary;Text;HTML;CSV;FixedLength;XML;MIME" default="XML"> <Variables> <Field name="FilePath" type="String"/> <Field name="FileDate" type="DateTime"/> <Field name="FileSize" type="Integer"/> </Variables> </Output>
POP3コンポーネントのようにCategoryPropertyをそのままストリーム変数とする場合はCategory要素に「streamVariables="true"」という属性を追加します。
<Category displayName="メールヘッダー" key="Name" mapping="input" name="MailHeaders" streamVariables="true"> <Property choiceItem="Message-Id
Reply-To
In-Reply-To
Content-Type
Date
References
X-Mailer" displayName="ヘッダー名" name="Name" toolTip="header name" type="editableChoice"/> </Category>
定義の詳細は定義ファイルリファレンスを参照してください。
定義ファイルの設定はあくまでデザイナー上での振る舞いを定義しているだけなので、コンポーネントのソース側ではここで定義したとおりに
StreamDataObjectに値を設定しなければなりません。
複数のストリームを扱うコンポーネントには処理に使用する値をストリーム変数から取得するものがあります。
例えばメールコンポーネントは添付ファイルのファイル名をFilePathというストリーム変数から取得します。
添付ファイルは複数ある場合があるので、そのファイル名をコンポーネントプロパティとして設計した場合、2つ目以降のストリームにファイル名をつけることができません。
このような場合にストリームごとに値を設定できるストリーム変数の使用を検討します。
先にも述べたとおり、定義ファイルでOutput要素にstreamPassThrough属性を定義した場合、入力ストリームをそのまま出力すると言う定義となります。
この場合、そのコンポーネント(正確には出力コネクタ)では直前に接続したコンポーネントのストリーム定義が参照されるようになります。
つまり、その出力コネクタはストリーム定義を持たなくなるので、この時OutputConnector#getStreamFactoryはnullを返します。
通常このような定義を行った場合はコンポーネント側でも入力ストリームをそのまま出力するように作成しますが必ずそうしなければならないという事ではありません。
例えばRecordFilterコンポーネントは定義ファイル上ではstreamPassThroughとなっていますが、ソースコード上では入力ストリームをフィルタリングして
新しいストリームを作成して出力しています。
このようなコンポーネントを作成する場合は、StreamFactory#getInstanceメソッドにより入力ストリームのストリーム定義をコピーしたStreamFactoryを取得して使用します。
InputStream is = getInputConnector().getStream(); StreamFactory factory = StreamFactory.getInstance(is);//入力ストリームと同じ定義のStreamFactoryを生成 ...
コンポーネントによってはフロー開発者にフィールド定義を行わせる必要がなく、常に固定のフィールド定義となっていれば良いものがあります。
例えばFileListコンポーネントではファイルの各種属性情報を列として持つRecordストリームに定義が固定されています。
このようなコンポーネントを作成する場合は、定義ファイルにフィールド定義を追加した上でreadonlyとします。
<Output accept="Record"> <FieldDef readonly="true"> <Field name="FileName" type="String"/> <Field name="FilePath" type="String"/> <Field name="FileDate" type="DateTime"/> <Field name="FileSize" type="Integer"/> <Field name="FileType" type="String"/> </FieldDef> </Output>
上記定義で「readonly="true"」を外した場合は初期状態で指定のフィールド定義が設定されますが編集は可能となります。
(つまりフィールド定義のデフォルト値を指定していることになります。)
コンポーネントのソース側では通常と同じようにOutputConnectorからStreamFactoryを取得して使用します。
ここまで説明してきたとおりフローサービスのコンポーネントデザインではストリーム定義は出力ストリームに対して行うこととなっています。
つまり、原則的にはコンポーネント開発者はそのコンポーネントがどのようなストリームを生成して出力するかを考えます。
しかしコンポーネントによっては出力ストリームではなく、入力ストリームを規定したいものがあります。
最も典型的なのはRDBPutコンポーネントで、このコンポーネントではRDBのテーブルから更新する列を選択して、
それを入力ストリームのフィールド定義とします。
このようなコンポーネントを作成する場合、定義ファイルのInput要素に「defineStream="true"」という定義を付加します。
<Input accept="Record" defineStream="true"> <FieldDef readonly="true"/> </Input> </Output streamPassThrough="true"/>
RDBPutコンポーネントではフィールド定義は専用のテーブル選択ダイアログを使用して行うので、FieldDefがreadonlyとなっていますが
Input要素以下の定義方法はOutput要素の場合と同じです。
上の例では入力ストリームで定義したストリームをそのまま出力していますが、入力ストリームと出力ストリームの両方を
定義させることも可能です。
このように入力ストリームを定義する、とした場合その定義はデザイナーによって直前にリンクされたコンポーネントの出力ストリーム定義にコピーされます。
(RDBPutコンポーネントの前にマッパーをリンクするとそのマッパーでは出力ストリーム定義がRDBPutコンポーネントの内容と同期され変更できなくなります。)
これがどういうことかというと、コンポーネントのソースコード側には「入力ストリームを定義する」という概念は存在しないと言うことです。
デザイナー上ではそのコンポーネントを選択して入力ストリームの定義を行いますが、それはデザイナーによって前のコンポーネントにコピーされているので、
コンポーネントのコードを書く際には通常のコンポーネントの場合と同じようにInputConnectorから入力ストリームを取得してそれに対して処理を行えば良いだけです。
ループとはコンポーネントを実行し、後続のフローを実行した後に再びそのコンポーネントに戻ってきて以降の処理を繰り返すことです。
executeメソッドとexecuteLoopメソッドの章でも述べたとおり、ループをサポートするコンポーネントではexecuteメソッドだけではなく、
executeLoopメソッドも併せて実装します。
またコンパイラにそのコンポーネントがループする可能性があることを通知するためにloopPossibilityメソッドもオーバーライドします。
(このメソッドはコンパイラがフローの実行順序を検証する際に使用されます。)
ループの例として最も単純なものは標準で提供されているLoopStartコンポーネントです。
以下にそのソースコードを示します。
package com.infoteria.asteria.flowlibrary2.component.control; import com.infoteria.asteria.flowengine2.execute.ExecuteContext; import com.infoteria.asteria.flowlibrary2.FlowException; import com.infoteria.asteria.flowlibrary2.component.ComponentException; import com.infoteria.asteria.flowlibrary2.component.SimpleComponent; import com.infoteria.asteria.flowlibrary2.stream.StreamType; import com.infoteria.asteria.flowlibrary2.property.IntegerProperty; public class LoopStartComponent extends SimpleComponent { public static final String COMPONENT_NAME = "LoopStart"; public String getComponentName() { return COMPONENT_NAME;} private static final String INVALID_LOOP_COUNT = "1"; private IntegerProperty _loopCount = new IntegerProperty("LoopCount", true, true, 1); private int _count; public LoopStartComponent() { getInputConnector().setAcceptType(StreamType.ALL); getInputConnector().setAcceptLinkCount(1); getOutputConnector().setAcceptType(StreamType.ALL); registProperty(_loopCount); } public boolean loopPossibility() { return true; } public boolean execute(ExecuteContext context) throws FlowException { _count = _loopCount.intValue(); if (_count <= 0) throw new ComponentException(getMessage(INVALID_LOOP_COUNT, Integer.toString(_count))); _count--; passStream(); return _count == 0; } public int executeLoop(ExecuteContext context) throws FlowException { _count--; passStream(); return _count == 0 ? LOOP_END : LOOP_CONTINUE; } }
executeメソッドでプロパティに設定されたループ回数を取得してメンバー変数「_count」に設定しています。
executeLoopメソッドでは「_count」をデクリメントしながら、0になるまでループを繰り返しています。
loopPossiblityメソッドは常にtrueを返しています。
ループ回数が1回の場合はループしないわけですが、このメソッドはあくまで「ループする可能性がある」ということを
示すメソッドなので、これでかまいません。
(_loopCountプロパティにはマッパーで値がマッピングされることもあるので、ここで「_loopCount.intValue() > 1」のように
ループ回数をチェックすることは逆に不適切です。)
ループするコンポーネントでは定義ファイルのOutput要素に「loop="true"」という属性を付加します。
<xsc lang="ja" xmlns="http://www.infoteria.com/asteria/flowengine/definition"> <Component category="コントロール" icon="loop_n.png" name="LoopStart" toolTip="指定回数ループします"> <Class>com.infoteria.asteria.flowlibrary2.component.control.LoopStartComponent</Class> <Message key="1">ループする回数が不正です。: %1</Message> <Property displayName="汎用" mapping="false" name="Exception" toolTip="Exception" type="exception"/> <Property displayName="ループする回数" mapping="true" name="LoopCount" toolTip="LoopCount" type="int">1</Property> <Input accept="ALL"/> <Output loop="true" streamPassThrough="true"/> </Component> </xsc>
こうすることでデザイナーのストリームアイコンがループを示すアイコンに変わります。
FileGetコンポーネントやRDBGetコンポーネントのようにプロパティによってループするかどうかを制御するようなコンポーネントでは、
LoopProcess型のプロパティを使用します。
これはBoolean型のプロパティを拡張したもので値の変更に連動してループアイコンの表示が切り替わります。
(ソースコード側では単純にBooleanPropertyを使用すればOKです。)
分岐コンポーネントはソースコード的にはComponentExitを複数持つコンポーネントです。
シンプルコンポーネントとは具体的には以下の点が異なります。
つまり分岐コンポーネントを作成する場合は、上記の条件を満たすように作成しなければならないと言うことです。
2分岐のコンポーネントを作成する場合はSimpleBranchComponentを継承して作成するのが簡単です。
分岐コンポーネントのサンプルとして一定の実行回数ごとに分岐するコンポーネントを以下に示します。
import com.infoteria.asteria.flowengine2.execute.ExecuteContext; import com.infoteria.asteria.flowlibrary2.component.SimpleBranchComponent; import com.infoteria.asteria.flowlibrary2.FlowException; import com.infoteria.asteria.flowlibrary2.property.IntegerProperty; import com.infoteria.asteria.flowlibrary2.stream.StreamDataObject; import com.infoteria.asteria.flowlibrary2.stream.StreamType; /** * 実行回数が指定の回数の倍数の場合に分岐するコンポーネント */ public class BranchByExecCountComponent extends SimpleBranchComponent { public static final String COMPONENT_NAME = "BranchByExecCount"; public String getComponentName() { return COMPONENT_NAME;} private IntegerProperty _branchCountProp = new IntegerProperty("BranchCount", true, false); public BranchByExecCountComponent() { super(StreamType.ALL, StreamType.ALL, StreamType.ALL); registProperty(_branchCountProp); } public boolean execute(ExecuteContext context) throws FlowException { StreamDataObject is = getInputConnector().getStream(); boolean bBranch = false; long execCount = getExecuteCount(); long branchCount = _branchCountProp.longValue(); if (execCount % branchCount == 0) bBranch = true; setOutputStream(bBranch, is); return true; } }
SimpleBranchコンポーネントではgetStateArrayメソッドは「default」と「branch」というふたつの値を返します。
そしてsetOutputStreamメソッドがStateの設定とそのStateに対応するComponentExitに対する出力ストリームの設定を一度にやっています。
「branch」に対応するComponentExitに出力ストリームをセットする場合は第1引数をtrueとします。
続いてこのコンポーネントの定義ファイルを以下に示します。
<xsc lang="ja" xmlns="http://www.infoteria.com/asteria/flowengine/definition"> <Component category="Sample" icon="" name="BranchByExecCount" toolTip="実行回数による分岐"> <Class>BranchByExecCountComponent</Class> <Property displayName="汎用" mapping="false" name="Exception" toolTip="Exception" type="exception"/> <Property displayName="分岐する実行回数" name="BranchCount" mapping="false" type="int">1000</Property> <Input accept="ALL"/> <Output streamPassThrough="true"/> <Output location="right" ref="true" state="branch"/> </Component> </xsc>
分岐コンポーネントでは分岐に対応する数だけOutput要素を定義します。
そしてstate属性に対応するState名を、location属性に出力コネクタの位置を示します。
(省略時にはstate属性は「default」に、location属性は「bottom」になります。通常のシンプルコンポーネントでは
stateやlocationは定義されていないのはこれらの定義を省略しているためです。)
またref属性はのコネクタの出力ストリーム定義は他のコネクタの定義を参照していることを示しています。
refの属性値には正式にはState名とコネクタ名で参照するコネクタを記述しますが、出力コネクタがふたつしかない場合は「true」とすることでdefaultのコネクタを指すこともできます。
このコンポーネントでは、どちらの出力コネクタでも入力ストリームをそのまま出力しているので、両方のOutput要素で「streamPassThrough="true"」とすることも
できますが、その場合ストリームペインがタブ化されて出力ストリーム定義がふたつ表示されます。
つまりそれぞれの定義には次のような意味の違いがあります。
どちらの定義内容であってもサーバー側のコンポーネントの動作は変わりませんが、前者の定義の方が情報量が少ない分フロー開発者に優しいと言えます。
このようにストリーム定義は出力コネクタごとに持たせることができるので分岐するそれぞれのコネクタで出力ストリームが異なるようなコンポーネントも作成可能です。
入出力が複数あるコンポーネントとはComponentEntrance(あるいはComponentExit)にサブコネクタがあるコンポーネントのことです。
サブコネクタは複数定義することができ、コンポーネントのコンストラクタで追加します。
通常はSimpleComponentを継承して、そのコンストラクタでサブコネクタの追加を行います。
public MultiInOutComponent() { //入力サブコネクタの作成(SimpleMailコンポーネントから抜粋) InputConnector sub = new InputConnector(StreamType.ALL, false); sub.setAcceptLinkCount(InputConnector.LINK_UNBOUNDED); sub.setAcceptContainer(true); sub.setExpandContainer(true); getComponentEntrance().addSubConnector("Attachment", sub); //出力サブコネクタの作成(RecordFilterコンポーネントから抜粋) OutputConnector sub = new OutputConnector(at, false); getComponentExit(STATE_DEFAULT).addSubConnector("Unmatch", sub); ... }
フローを作成する場合入出力共にデフォルトコネクタにはリンクが必須ですが、サブコネクタではリンクを必須とするかどうかを指定することができます。
標準提供されている複数入出力をサポートするコンポーネントであるメールコンポーネント、RecordFilterコンポーネントともにデフォルトではサブコネクタはアイコン上に
表示されていませんが、コンポーネントのソース側ではそれらのサブコネクタは常に存在します。(デザイナー上でサブコネクタが表示されているかどうかはコンポーネント
の動作には関係がありません。)
サブコネクタからの入力ストリームの取得や出力ストリームの設定はデフォルトコネクタの場合と同じようにexecute(またはexecuteLoop)内で行います。
必須ではない出力サブコネクタでストリームの設定が必要ない場合はnullを設定してください。
nullが設定されたサブコネクタにリンクされた後続のフローは実行されません。
つまり存在する場合もない場合もあるデータ(例えばメールの添付ファイルのようなデータ)をサブコネクタに設定し、データがある場合だけ後続のフローを実行させることができます。
サブコネクタは定義ファイルではInputまたはOutput要素の子要素にSubConnector要素として定義します。
SubConnectorのname属性にはソースコードでaddSubConnectorした時に使用した名前を指定します。
SubConnector要素の定義内容はlocationが指定できない以外、InputまたはOutput要素と同じです。
locationが指定できないのでサブコネクタはアイコン上では常にデフォルトコネクタと同じ辺に表示されます。
このためサブコネクタのあるコンポーネントは通常のコンポーネントよりもアイコンサイズが大きくなるので、
IconReplaceプラグインを使うなどして表示するアイコンを調整する必要があります。
ExcelInput/ExcelOutputコンポーネントのように設計時に動的にコネクタ数が変わるようなコンポーネントも作成可能です。
この場合のコンポーネントおよび定義ファイルの作成は以下のように行います。
MultiConnectorComponentは入出力コネクタを動的に増減するための仕組みをあらかじめ備えた抽象クラスです。
コンストラクタで第1引数(bInput)をtrueにした場合は、「InputCount」というプロパティが追加され、
第2引数(bOutput)をtrueにした場合は、「OutputCount」というプロパティが追加されます。
コンパイル時にはこれらのプロパティの設定値を見て自動的にコネクタが追加されます。
ConnectorControllerはプロパティ値に応じてデザイナー上でのコンポーネント表示を変更するプラグインです。
このPropertyListenerを使用することでExcelコンポーネントのような動的なコネクタ数の増減が実現できます。
定義ファイルに以下の定義を加えれば出力数プロパティの設定値に応じて出力コネクタ数が増減することが確認できます。
<Property displayName="出力数" name="OutputCount" type="int" >1</Property> <PropertyListener class="com.infoteria.asteria.flowbuilder2.plugin.ConnectorController" mode="output" target="OutputCount"/>
通常は出力数を示すプロパティをインスペクタ上に表示してフローの開発者に直接設定させることはあまりなく、 Excelコンポーネントのように非表示プロパティとした上で別途コンポーネントをダブルクリックした時に起動するコンポーネントエディタから制御します。
コンポーネントの実行中に何らかの例外が発生した場合はFlowException(のサブクラス)をthrowします。
通常、コンポーネントがExcepitionを投げる場合はComponentExceptionまたはComponentExceptionByMessageCodeを作成してthrowします。
FlowExceptionのサブクラスには他にStreamで例外が発生した場合のStreamException、Connectionで例外が発生した場合のFlowConnectionExceptionなどがあります。
これらのExceptionはコンポーネント開発者が意図しない場合でも発生する可能性があります。
(例えばXMLストリームの生成でbyte[]を引数として作成する場合に、ウェルフォームドなXML以外のbyte列を引数とするとStreamExceptionとなります。)
コンポーネントの実行でExceptionが発生した場合、それはExceptionPropertyに設定されたエラー処理フロー、あるいは組み込みのエラー処理によって処理されます。
ExceptionPropertyはException発生時の処理をフロー開発者に設定させるためのプロパティで、フローまたはコンポーネントで設定できます。
ExceptionPropertyでキャッチできるのはFlowExceptionだけです。
それ以外のException(RuntimeException)が発生した場合はバグ扱いとなりますので、エラー処理には回されずフローが異常終了します。
FlowExceptionにはStateというExceptionの種類を表す属性があります。特に設定しない限りStateの値はFlowException#STATE_DEFAULT(=0)という値になります。
StreamExceptionなどのコンポーネント開発者の意図と関係なく発生する可能性のあるExceptionのStateはすべてFlowException#STATE_DEFAULTとなっています。
またExceptionPropertyにもState属性があり、FlowExceptionのStateと対応しています。
「State=1」で作成されたExcpetionPropertyは「State=1」のFlowExceptionだけをキャッチし、それ以外のStateのFlowExceptionは無視します。
原則的にはこのようにExceptionPropertyは対応するStateのExceptionのみをキャッチしますが、例外的に「State=0」のExceptionPropertyだけはあらゆるStateのExceptionをキャッチします。
SimpleComponentとSimpleBranchComponentには最初から「State=0」のExceptionPropertyが「Exception」というプロパティ名で登録されているので、
これを継承してコンポーネントを作成する限りコンポーネント開発者が自分で汎用のExceptionPropertyを追加する必要はありません。
(ただし定義ファイルの方では汎用ExceptionもProperty要素で定義しなければなりません。)
エラー処理を種類によって別に設定させる必要がなければ、これ以外の作業は必要ありませんが、特定のExceptionでの処理を別に設定させたい場合にはそれに対応するExceptionPropertyをコンポーネントに追加します。
例えばFileGetコンポーネントではファイルが見つからなかった場合には「State=1」のExceptionがthrowされます。
この時の実行エンジンでのエラー処理の流れは以下のようになります。
エラーメッセージなどのリソースはソースファイル中に直接書くこともできますが、定義ファイル中にMessage要素として記述することでソースと分離することができます。
定義ファイルではロケールを指定して複数の定義を持たせることができるので、そうすることによってロケールに応じて出力メッセージを切り替えることができます。
定義ファイル中に記述したメッセージはComponent#getMessageメソッドにより取得できます。
<!-- Message要素で定義した内容はComponentのソースファイル中で String filename; String msg = getMessage("1", filename); のようにパラメーター(%1〜%3)を置換しながら取得することができます。 --> <Message key="1">ファイルパスが不正です : %1</Message>
ComponentExceptionByMessageCodeはこのメッセージリソースの取得を簡単に行えるようにしたFlowExceptionサブクラスです。
(Message要素の内容はエラーメッセージ以外にもログ出力などの他の用途に使用しても構いません。)
FlowExceptionにはここまで説明してきた以外に設定可能な項目がいくつかあります。
以下にそれぞれの設定項目について簡単に説明します。
また、これらの情報をいつ設定するかについて、次の二つのタイミングがあります。
これらの機能をどのように使用するかはコンポーネント開発者の設計次第です。
エラー発生時にどのような情報をエラー処理フローに引き渡したいかを検討した上で設計を行ってください。
トランザクション処理はTransactionインターフェースを介して行われます。 Transactionインターフェースは以下に示すようにcommitメソッドとrollbackメソッドを持つインターフェースです。
package com.infoteria.asteria.flowengine2.execute; import com.infoteria.asteria.flowlibrary2.FlowException; public interface Transaction { public void commit(ExecuteContext context) throws FlowException; public void rollback(ExecuteContext context) throws FlowException; }
トランザクションをサポートするコンポーネントではexecute(またはexecuteLoop)メソッド内でトランザクションマネージャーに Transactionを追加していきます。
トランザクションの実装ではTransactionインターフェースを継承したExtendedTransactionとして実装することも可能です。
ExtendedTransactionでは通常のTransactionと以下の点が異なります。
フローのトランザクションの管理はトランザクションマネージャーによって行われます。
サブフローやエラー処理フロー、Nextフローまで含む一連のフローの実行はひとつのトランザクションマネージャーによって管理されます。
トランザクションマネージャーへのトランザクションの追加では原則的に、追加されたトランザクションがその順番で
トランザクションマネージャー内に積み上がっていきます。
ただし拡張トランザクションはトランザクションマネージャー内の先頭にある拡張トランザクションコーディネーターに積まれます。
また同じTransactionインスタンスは一度しかトランザクションマネージャーに積み上がりません。
拡張トランザクションコーディネーターはそれ自体がTransactionインターフェースを実装したクラスであり、常にトランザクションマネージャー
の先頭に位置します。
つまり拡張トランザクションは常に通常のトランザクションよりも前に積まれます。
それぞれの拡張トランザクションはその優先順位に従って拡張トランザクションコーディネーター内に積み上がっていきます。
拡張トランザクションコーディネーターに拡張トランザクションが追加された時にはそのstartメソッドが実行され、
拡張トランザクションコーディネーターのcommit時には内部に積まれている拡張トランザクションのprepareメソッドが順次実行されてから
commitメソッドが順次実行されます。
積上げられたトランザクションがどういうタイミングでコミット(またはロールバック)されるかはStartコンポーネントの トランザクション化プロパティによって決まります。
「トランザクション化=はい」の場合 | トランザクションマネージャーはフローの実行終了時に、 その終了コンポーネントのトランザクションプロパティの設定にしたがって、 コミットあるいはロールバックされる。 |
---|---|
「トランザクション化=いいえ」の場合 | トランザクションマネージャーは1コンポーネント実行する度に、
毎回コミットされる。 (実行したコンポーネントがトランザクションを使用しないコンポーネントの場合は、 トランザクションマネージャーには何も積みあがっていないので何もしない) |
「トランザクション化=はい」とした場合は、そこから呼び出されるサブフローやエラー処理フローでは、そのフローのStartコンポーネントの設定に
関わらずトランザクション化されます。
つまり親フローのみがトランザクション化され、子フローのみ非トランザクション化という状態はありえません。
逆に親フローの「トランザクション化=いいえ」の場合はサブフローやエラー処理フローのみをトランザクション化させることは可能です。
トランザクション化の状態によってExceptionが発生した場合のエラー処理の実行シーケンスも異なります。
「トランザクション化=はい」の場合 | トランザクションマネージャーに対しては何の操作もおこなわれないままエラー処理フローが実行されます。 つまり積上げられたトランザクションはエラー処理フローに引き継がれます。 |
---|---|
「トランザクション化=いいえ」の場合 | トランザクションマネージャーがロールバックされてからエラー処理フローが実行されます。 |
エラー処理フローが設定されていない場合はいずれの場合もトランザクションマネージャーはロールバックされ、異常終了します。
トランザクションをサポートするコンポーネントを作成する場合、そのexecuteまたはexecuteLoopメソッド内でExecuteContext#addTransactionメソッドで、 作成したトランザクションをトランザクションマネージャーに積上げていきます。
この時に重要なのはトランザクションマネージャーの「同じインスタンスのトランザクションは一度しか積みあがらない」という仕様です。
この仕様はフローでループが発生する場合や複数のコンポーネントがひとつのトランザクションを共有する場合に大きな意味を持ちます。
トランザクションの仕組みを理解するためにトランザクション処理の内容としてログ出力を行う次のような2つのコンポーネントを考えてみます。
/** * Transactionとして追加されるのはComponent自身。 * つまりループの中で複数回コンポーネントが実行される場合は毎回同一インスタンスが * ExecuteContext#addTransactionに渡される */ class ComponentA extends SimpleComponent implements Transaction { public String getComponentName() { return "ComponentA";} public boolean execute(ExecuteContext context) throws FlowException { context.addTransaction(this); passStream(); return; } public void commit(ExecuteContext context) throws FlowException { //ExecuteContext#infoはログに情報を出力するメソッド context.info("ComponentA commit"); } public void rollback(ExecuteContext context) throws FlowException { context.info("ComponentA rollback"); } } /** * 追加するTransactionを毎回 new しているので常に異なるインスタンスとなる */ class ComponentB extends SimpleComponent { public String getComponentName() { return "ComponentB";} public boolean execute(ExecuteContext context) throws FlowException { context.addTransaction(new MyTransaction()); passStream(); return; } private static class MyTransaction implements Transaction { public void commit(ExecuteContext context) throws FlowException { context.info("ComponentB commit"); } public void rollback(ExecuteContext context) throws FlowException { context.info("ComponentB rollback"); } } }
この2つのコンポーネントを用いて次のようなフローを書いたとします。
Start(トランザクション化=はい) ↓ LoopStart(ループする回数=3) ↓ ComponentA ↓ ComponentB ↓ End
するとログへの出力結果は次のようになります。
情報(End1) : ComponentA commit 情報(End1) : ComponentB commit 情報(End1) : ComponentB commit 情報(End1) : ComponentB commit
トランザクションマネージャーのコミットはフローの終了時(Endコンポーネント実行後)に一度だけ行われています。
ComponentA、ComponentBともにループの中で3回実行されていますが、ComponentAが積んだTransactionはひとつだけであり、
ComponentBは3つのTransactionを積み上げていることがわかります。
ちなみにこのフローをStart(トランザクション化=いいえ)として実行すると、結果は以下のようになります。
情報(ComponentA1) : ComponentA commit 情報(ComponentB1) : ComponentB commit 情報(ComponentA1) : ComponentA commit 情報(ComponentB1) : ComponentB commit 情報(ComponentA1) : ComponentA commit 情報(ComponentB1) : ComponentB commit
トランザクション化=いいえの場合はコンポーネント実行直後にトランザクションマネージャーがコミットされるので、 ComponentAが2回目に実行される際には以前に追加したTransactionは残っておらず、 再度ComponentA自身がTransactionとして追加されているのです。
拡張トランザクションでは障害発生時のリカバリー処理を実装することができます。
ここで障害とは以下の二つの場合です。
この場合は再起動時に保存されているトランザクションステータスを元にリカバリーが実行されます。
リカバリー処理は1度だけ行われリトライは行われません。
この場合FSMCでの設定に従って既定の回数リトライが行われます。
いずれの場合もリカバリー時に実行されるのはRecoveryData#recoveryメソッドです。
リカバリーをサポートする拡張トランザクションを作成する場合は以下の要件を満たす必要があります。
現在標準でExtendedTransactionを実装しているトランザクションはRDBに対するトランザクションのみで リカバリー可能なExtendedTransactionはXA対応のRDBトランザクションのみです。
標準で提供されているコンポーネントでトランザクションを実装しているものには以下のようなものがあります。
コンポーネント | コミット | ロールバック |
---|---|---|
RDBGet RDBPut SQLCall | DBMSをコミットする | DBMSをロールバックする |
FileGet FTPGet | 「コミット時の処理=ファイルを削除」の場合、 ファイルを削除する | 何もしない |
FileGet FTPGet | 「コミット時の処理=ファイルを削除」の場合、 ファイルを削除する | 何もしない |
POP3 IMAP4 | 「コミット時の処理=サーバーからメッセージを削除」の場合、 メッセージを削除する | 何もしない |
RDBGetやRDBPutなどのコネクションを扱うコンポーネントでは、同一コネクションを使用する場合には同じトランザクション インスタンスを使用するので、複数のRDB系コンポーネントを配置してもトランザクション化されている場合は一連トランザクションになります。
「トランザクション」というとDBMSに対するトランザクションのイメージが強いですが、 フローサービスのトランザクションは考え方によっては実行タイミングの異なる処理を記述できる仕組みと捉えることもできます。
Connectionとはコンポーネントから参照できる定義体のセットであり、またその定義情報を基に生成されるクラスをラップするクラスでもあります。
フローサービスではRDB、SMTP、HTTP、FTPなどの接続情報がConnectionとして定義され、FSMCあるいはデザイナーより設定可能です。
これらのコネクションにそれぞれ対応するConnectionクラスがあり、それぞれになんらかのクラスがラップされているわけですが、 現在はRDBConnectionと汎用コネクション以外のConnectionクラスの仕様は一般公開されていません。
Connectionを使用するクラスではコネクション種別を指定して、ConnectionPropertyを作成します。
ExecuteContext#getConnectionメソッドにコネクション名が設定されたConnectionPropertyを渡すとそれに対応するConnectionクラスが返されます。
(対応するコネクションが見つからない場合はExceptionとなります。)
ここで返されるのはコネクション種別毎に異なるConnectionインターフェースの実装クラスなので、実際のクラスにキャストして使用します。
RDBコネクションを使用する場合は実際に返されるクラスはRDBConnectionですので、それにキャストしてそこからJDBCのConnectionを取得します。
またConnectionを使用する場合はそのTransactionオブジェクトはExecuteContext#getConnectionTransactionメソッドで取得します。
このメソッドでは同じコネクションが使用される場合には同じTransactionインスタンスが返されるので、複数のコンポーネントが同じコネクションを
使用している場合でもそれらを一連トランザクションとすることができます。
private ConnectionProperty _connectionProp = new ConnectionProperty(RDBConnectionEntry.TYPE, "Connection"); /** * executeメソッドの先頭で使用しているConnectionに対応するTransactionを追加 */ public boolean execute(ExecuteContext context) throws FlowException { RDBConnection rcon = (RDBConnection)context.getConnection(_connectionProp); java.sql.Connection con = rcon.getConnection(); ... context.addTransaction(context.getConnectionTransaction(_connectionProp)); } */
JDBCのConnectionを取得した後はそれを使用して自由にJDBCのプログラミングを行って構いませんが、Connection#closeメソッドを実行してはいけません。
何故なら取得したConnectionはフロー中の別コンポーネントによって使用される可能性があり、さらにはコネクションプールによってそれ以上に再利用される可能性があるからです。
また、Connection#commit/Connection#rollbackメソッドも実行してはいけません。これらはフローのトランザクションフレームワークの中で実行されるべきメソッドだからです。
(特にXA対応のRDBConnectionの場合これらのメソッドをXAResourceを介さずに実行した場合の動作はJDBCドライバーの実装依存になります。)
逆にRDBConnectionを使用しているにも関わらずそのTransactionをTransactionManagerに追加しなかった場合は、トランザクションの整合性が維持されなくなります。
ストリーム作成とコネクション使用の例として指定のテーブルの列情報を出力するコンポーネントの例を以下に示します。
このコンポーネントではDBMSに対する更新処理は行っていないので、本当ならトランザクションを使用する必要はないのですが、
例としてConnectionのトランザクションをトランザクションマネージャーに積んでいます。
(DBMSに対して余分なcommitが発行されることになりますがそれ以外の害はありません。)
import com.infoteria.asteria.flowengine2.execute.ExecuteContext; import com.infoteria.asteria.flowengine2.flow.InputConnector; import com.infoteria.asteria.flowlibrary2.FlowException; import com.infoteria.asteria.flowlibrary2.component.SimpleComponent; import com.infoteria.asteria.flowlibrary2.component.ComponentException; import com.infoteria.asteria.flowlibrary2.stream.*; import com.infoteria.asteria.flowlibrary2.property.*; import com.infoteria.asteria.connection.RDBConnection; import com.infoteria.asteria.connection.RDBConnectionEntry; import com.infoteria.asteria.value.Value; import java.sql.*; import java.util.ArrayList; /** * 指定のTableのカラム情報をCSVまたはRecord型の出力ストリームとして出力するコンポーネント<br/> * コンポーネント内での作成するストリームのフィールド数は固定(カラム名、データ型の2列)<br/> * なので定義ファイル側でフィールド定義を固定しておく<br/> * Tableが見つからない場合のExceptionは通常のExceptionとは別に「TableNotFoundException」という * プロパティでキャッチできることとする。 */ public class TableInfoComponent extends SimpleComponent { public static final String COMPONENT_NAME = "TableInfo"; public String getComponentName() { return COMPONENT_NAME;} private static final int TABLE_NOT_FOUND = 1; /** * このコンポーネントが宣言されているmscファイルに <Message key="1">テーブルが見つかりません: %1</Message> <Message key="2">出力ストリームのフィールド定義が不正です</Message> というエントリを追加すること */ private static final String MSG_TABLE_NOT_FOUND = "1"; private static final String MSG_INVALID_FIELDDEF = "2"; private ConnectionProperty propConnection = new ConnectionProperty(RDBConnectionEntry.TYPE, "Connection"); private StringProperty propTableName = new StringProperty("TableName", true, true); private ExceptionProperty propTableNotFound = new ExceptionProperty("TableNotFoundException", TABLE_NOT_FOUND);//第2引数には0以外の任意の数値を指定 private RDBConnection _rcon = null; public TableInfoComponent() { //入力ストリームは使用しないので、すべてのストリームフォーマットを複数受け入れ可能とする //FileSystem(Get)などの入力ストリームを使用しない標準コンポーネントはすべてこのようにデザインされている getInputConnector().setAcceptLinkCount(InputConnector.LINK_UNBOUNDED); //getInputConnector().setAcceptContainer(true); //getInputConnector().setExpandContainer(false); getInputConnector().setAcceptType(StreamType.ALL); //出力ストリームはCSVまたはRecord getOutputConnector().setAcceptType(StreamType.CSV|StreamType.RECORDS); //プロパティの登録 registProperty(propConnection); registProperty(propTableName); registProperty(propTableNotFound); //「Exception」という名前のExceptionPropertyはSimpleComponentクラスでregistされている。 } /** コンポーネント初期化時にRDBConnectionを取得 */ public void init(ExecuteContext context) throws FlowException { _rcon = (RDBConnection)context.getConnection(propConnection); //propConnectionで指定されたConnectionが取得できない場合はここでFlowConnectionException( extends FlowException)が発生する //必要ならそのExceptionは「Exception」プロパティで指定したフローによってキャッチする。 } public void term(ExecuteContext context) { _rcon = null; //現状ではこの処理は必須ではないができるだけインスタンス変数は初期状態に戻すことが望ましい。 } public boolean execute(ExecuteContext context) throws FlowException { Connection con = _rcon.getConnection(); String table = propTableName.strValue(); try { //DatabaseMetaDataを使用してテーブル情報を取得しても良いが、ここではResultSetMetaDataからTable情報 //を取得することにした。 Statement stmt = con.createStatement(); ResultSet rs = null; try { String sql = "SELECT * FROM " + table + " WHERE 1 = 2"; rs = stmt.executeQuery(sql); } catch (SQLException e) { //ここでキャッチされた例外はテーブルが見つからなかった場合と思われる。 //ここでthrowするExceptionだけStateを「1」に。 throw new ComponentException(getMessage(MSG_TABLE_NOT_FOUND, table), TABLE_NOT_FOUND); } ResultSetMetaData meta = rs.getMetaData(); StreamFactory factory = getOutputConnector().getStreamFactory(); //定義されたフィールド数が2列でなければExceptionとする。 if (factory.getFieldDefinition().getFieldCount() != 2) throw new ComponentException(getMessage(MSG_INVALID_FIELDDEF)); StreamDataObject os = null; switch (factory.getType()) { case StreamType.CSV: os = createCSV(meta); break; case StreamType.RECORDS: os = createRecord(meta); break; default: //このコードが実行されることはない throw new IllegalStateException(); } setOutputStream(os); //使用したコネクションをTransactionManagerに追加する context.addTransaction(context.getConnectionTransaction(propConnection)); } catch (SQLException e) { throw new ComponentException(e); } return true; } /** CSVストリームの作成 */ private StreamDataObject createCSV(ResultSetMetaData meta) throws FlowException, SQLException { int len = meta.getColumnCount(); //CSV文字列の作成 StringBuffer buf = new StringBuffer(512); for (int i=1; i<=len; i++) { buf.append(meta.getColumnName(i)).append(','); buf.append(meta.getColumnTypeName(i)).append('\n'); } StreamFactoryCSV factory = (StreamFactoryCSV)getOutputConnector().getStreamFactory(); return factory.create(buf.toString()); } /** Recordストリームの作成 */ private StreamDataObject createRecord(ResultSetMetaData meta) throws FlowException, SQLException { int len = meta.getColumnCount(); //Record用のListの作成 ArrayList list = new ArrayList(len); for (int i=1; i<=len; i++) { Value[] values = new Value[2]; values[0] = new Value(meta.getColumnName(i)); values[1] = new Value(meta.getColumnTypeName(i)); list.add(values); } StreamFactoryRecord factory = (StreamFactoryRecord)getOutputConnector().getStreamFactory(); return factory.create(list); } /** ExceptionParamとして「TableName」を設定 */ public void setExceptionParam(FlowException e) { e.addParam(propTableName.getName(), propTableName.getValue()); } }
汎用コネクションは任意の名前と値のセットをコネクション定義として保存しておける仕組みです。
またコンポーネントで生成したオブジェクトを汎用コレクションに関連付けてプールしておくこともできます。
汎用コネクションを利用した典型的なコードは以下のようになります。
private ConnectionProperty _connectionProp = new ConnectionProperty(CommonConnectionEntry.TYPE, "Connection"); public boolean execute(ExecuteContext context) throws FlowException { CommonConnection con = (CommonConnection)context.getConnection(_connectionProp); MyObject obj = (MyObject)con.getRelatedObject(); if (obj == null) { obj = createObject((CommonConnectionEntry)con.getEntry()); con.setRelatedObject(obj); } //ToDo ここでMyObjectを使用した処理を実装 return true; } private MyObject createObject(CommonConnectionEntry entry) { //コネクション定義で設定したパラメーターの取得 Map map = entry.getParameterMap(); String value1 = (String)map.get("AAA"); String value2 = (String)map.get("BBB"); //取得したパラメーターを利用してオブジェクトを生成 return new MyObject(value1, value2); }
実行中にExecuteContextから同じ名前で取得されるConnectionは常に同一インスタンスです。
コードの中でコネクションに関連付けたオブジェクトを、存在すればそれを利用、なければ生成としているので
複数のコンポーネントが同じコネクション名を参照することによって同一のオブジェクトを共有することができます。
またコネクション定義でコネクションプールを設定しておけば生成したオブジェクトを一定期間プールしておくことも可能です。
(プール期間の設定はFSMCで行います。)
この場合コネクションに関連付けるオブジェクトにClosableインターフェースを実装しておけば、プールから解放されるタイミング
で実行されるコールバックを実装することができます。
(コネクションプールを使用しない場合はClosable#closeメソッドはフローの実行終了時に実行されます。)
pluginCallメソッドはデザイナーで作成したプラグインからのリクエストを受けて実行されるコールバックメソッドです。
レスポンスは任意のXML形式で返すことができます。
つまりプラグイン開発時にはこの仕組みを利用してサーバー側でコードを実行してその結果をデザイナーに返すことができます。
(pluginCallが実行されるのはフローの実行時ではなく設計時である点が他のコールバックメソッドと大きく異なります。)
また、pluginCallでStringの配列を返すレスポンスを作成しておけばそのString配列はデザイナー側でpluginCallPropertyを定義することに よりドロップダウンリスト形式のプロパティにすることができます。
pluginCallの詳細についてはプラグイン開発者ガイドを参照してください。
IndependentMapとはExecuteContext#getIndependentMapメソッドで取得できるMapのことです。
このMapはフローの実行開始から終了までサブフローやエラー処理フロー、Nextフローまで含めて引き回されます。
このMapを利用することで関連する複数のコンポーネントで任意のオブジェクトを共有することができます。
例えば特殊なDBMSを使用するコンポーネント群を作成する場合にConnectコンポーネントが作成した接続情報のクラスを
後続のコンポーネントが取り出して使用するような使い方ができます。
Mapのキーに使用する名前には厳密なルールはありませんが、重複を避けるためにJavaのパッケージ名と同様にドメイン名を 付加した名前を使用することを推奨します。
private MyObject _obj = null; //このコンポーネントがフローの中で最初に使用された場合だけMyObjectを生成し、 //後のコンポーネントではそれを使いまわす public void init(ExecuteContext context) throws FlowException { String key = getClass().getName(); _obj = (MyObject)context.getIndependentMap().get(key); if (_obj == null) { _obj = createMyObject(); context.getIndependentMap().put(key, _obj); } }
サブフローやエラー処理フロー、Nextフローを含む一連のフローはひとつのスレッドで実行されるので、 上のコードではマルチスレッドについて考慮する必要はありません。
IndependentMapにputされたオブジェクトがReleasableインターフェースを実装していた場合は、 リクエスト終了時にそのreleaseメソッドがコールバックされます。
package com.infoteria.asteria.flowengine2.execute; /** * リソース開放のためのインターフェース。
* リクエスト終了時にExecuteContext#getIndependentMap内にあるオブジェクトが * このインターフェースを実装していた場合、releaseメソッドがコールバックされます。 */ public interface Releasable { /** リソースを開放します。 */ public void release(ExecuteContext context); }
例えば先にあげた特殊なDBMSの接続情報をIndependentMapに入れて使いまわすなら、そのクローズは
リクエスト終了時が適切なのでこのインターフェースを実装するようにしてください。
(Component#termメソッドはNextフローの実行よりも先にコールバックされるため適切ではありません。)
コンポーネントのexecute(またはexecuteLoop)の処理がある程度時間がかかることが予想される場合、
強制終了に対処するためのコードを実装することが望ましいです。
キャンセル処理が適切に実装されていない場合、強制終了を実行したにも関わらずのにフローがいつまでたっても終了しないという
状況が発生する可能性があります。
例として次のようなコンポーネントの実行処理を考えます。
この処理の実装イメージを以下に示します。
(実際の更新処理の実装はこの章の趣旨ではないので省略しています。)
import javax.sql.PreparedStatement; ... /** * 入力レコードでループを回し処理を順次実行する */ public boolean execute(ExecuteContext context) throws FlowException { PreparedStatement stmt = getStatement(); Record record = getInputConnector().getStream().getRecord(); while (record != null) { doUpdate(stmt, record); record = record.nextRecord(); } passStream(); return true; } /** * 更新用のSQLを生成する */ private PreparedStatement getStatement() { ... } /** * DBに対してUpdateを実行する */ private void doUpdate(PreparedStatement stmt, Record record) { ... }
この処理の場合executeメソッドの実行開始から終了までに時間のかかる要因として次の2つが考えられます。
次節以降これらの場合に適切に強制終了が実行される方法を説明ます。
ExecuteContext#notifyRunningはフローの実行エンジンに対して処理の実行中であることを通知するためのメソッドです。
このメソッドはフローが強制終了された場合はExceptionをthrowします。
つまり時間のかかる処理の実行中に定期的にこのメソッドを実行すれば強制終了時にはそこで処理を抜けることができます。
/** * 入力レコードでループを回し処理を順次実行する */ public boolean execute(ExecuteContext context) throws FlowException { PreparedStatement stmt = getStatement(); Record record = getInputConnector().getStream().getRecord(); while (record != null) { doUpdate(stmt, record); record = record.nextRecord(); context.notifyRunning(); } passStream(); return true; }
cancelメソッドは強制終了時に指定のタイムアウト時間(デフォルト5秒)待ってもコンポーネントの実行が終わらなかった場合に 実行されるメソッドです。
この例の場合、テーブルに対してロックがかかっているとJDBCのPreparedStatement#executeUpdateメソッドで処理が停滞し いつまでたっても処理が終わりませんが、キャンセル時にPreparedStatment#cancelメソッドを実行することでSQLの実行を中断 させることができます。
private PreparedStatement _stmt = null;//cancelメソッドからアクセスできるようにメンバ変数にする /** * 入力レコードでループを回し処理を順次実行する */ public boolean execute(ExecuteContext context) throws FlowException { _stmt = getStatement(); try { Record record = getInputConnector().getStream().getRecord(); while (record != null) { doUpdate(_stmt, record); record = record.nextRecord(); context.notifyRunning(); } } finally { _stmt = null; } passStream(); return true; } /** * cancelメソッドの実装 */ public boolean cancel() { if (_stmt != null) { try { _stmt.cancel(); } catch (SQLException e) { } catch (NullPointerException e) { } } return true; }
cancelメソッド内で_stmtに対するnullチェックを行っているにも関わらずNullPointerExceptionをキャッチしていることに注意してください。
cancelメソッドはexecuteを実行指定しているスレッドとは別スレッドから実行されます。
このため先にnullチェックを行っていても次のステップに進むまでにexecuteメソッドが進行して_stmtにnullが代入されることがあり得ます。
このようにcancelメソッドの実装は否応なくマルチスレッドプログラミングになるのでシングルスレッドでの
処理のみを考えれば良いexecuteなどの実装とはやや性質が異なります。
※実際にはcancelメソッド内でException(RuntimeException)が発生した場合は実行エンジンによってcancel処理が実行されたものとして処理されます。
Component#internalInitはコンポーネントの初期化時に一度だけ実行されるメソッドです。
コンポーネントの初期化はそのコンポーネントを含むフローが最初にコンパイルされたタイミングで行われます。
つまり必要に迫られるまで初期化は遅延されているわけですが、定義ファイルでClass要素に「init="start"」という
属性をつけることでサーバー起動時に初期化(=internalInitメソッドの実行)を行うことも可能です。
例えばMutexコンポーネントはその初期化時にflow-ctrlにMutex関連のコマンドを追加しています。
(flow-ctrlへのコマンド追加の方法は現在公開されていません。)
Componentクラスにはサーバー終了時に実行されるコールバックメソッドはありません。
サーバー終了時になんらかの処理を行いたい場合はFinalizerインターフェースを実装したクラスを
作成し、FlowEngine#addFinalizerメソッドで直接FlowEngineに登録してください。
package com.infoteria.asteria.flowengine2.execute; /** * サーバー終了時に実行されるコールバックのためのインターフェース。
* サーバーの終了時にFlowEngine#addFinalizerで追加されたオブジェクトが * このインターフェースを実装していた場合、releaseメソッドがコールバックされます。 */ public interface Finalizer { /** リソースを開放します。 */ public void release(); }
Finalizerインターフェースは正常終了時の実行シーケンスの中で実行されるコールバックなので、 フローサービス自体が異常終了した場合は実行されません。
コンポーネントでライセンスチェックを行いたい場合は、ライセンスチェックに必要な情報、
例えばライセンスファイルはコンポーネントのjarファイルと同じ場所に置くことを推奨します。
コンポーネントのjarファイルは、
[data dir]/system/lib/components
に配置しますので、ライセンスファイルも同じフォルダーに配置します。
実際のライセンスチェックの処理は、Component#checkLicenseをオーバーライドして実装します。
/** ライセンスチェック処理を記述します */ public void checkLicense() throws LicenseException { }
このメソッドの中でComponent#getComponentLibPathメソッドを実行すると前述のcomponentsフォルダーのパスを取得できます。
これにより、同じフォルダーに配置したライセンスファイルを取得することができます。
ライセンスファイルを取得したら、後はライセンスチェック処理を実装します。
もしもライセンス違反となった場合は、LicenseExceptionをthrowします。
ライセンスチェックの基本的な実装は次のようになります。
public void checkLicense() throws LicenseException { // ライセンスファイルを取得します File licenseFile = new File(getComponentLibPath(), "ライセンスファイル名"); // licenseFileを読み込んでライセンスチェックを行います // ライセンス違反の場合にはLicenseExceptionをthrowします if (licenseError) { throw new LicenseException(); } }
どのような方法でライセンスチェックをする場合でも、ライセンス違反となった場合にはLicenseExceptionをthrowするようにしてください。
ComponentCompilerはその名の通りコンポーネントのコンパイラです。
具体的にはxfpファイル内のComponent要素を解釈してプロパティ値やストリーム定義を設定しているのがComponentCompilerです。
コンポーネントのコンパイル時の流れは以下のようになります。
つまりComponent#getCompilerメソッドをオーバーライドし、自作のComponentCompilerサブクラスを返すようにすることで
コンパイラの動作を拡張することができます。
コンパイラを拡張することで例えば以下のようなことが行えます。
ほとんどの場合はComponentCompilerのサブクラス化でオーバーライドしなければならないのはpostCompile(Map)メソッドだけです。
このメソッドはプロパティ値の設定やリンクの接続などのフローのコンパイルが完了した時に実行されるのでプロパティ値のチェックなどは
このメソッド内で行うことができます。
ComponentCompilerは独立クラスとして作成することも可能ですが、Componentのprivate classとして作成する方が プロパティ値やメンバー変数へのアクセスが簡単になり便利です。
フローの実行時にはコンパイルされたComponentがそのまま使用されるのではなく複製されます。
つまりマルチスレッドで同じフローが複数同時実行される場合でもそれぞれのComponentはすべて別のインスタンスになります。
複製の際に使用されるメソッドはcloneメソッドですが、Componentのcloneメソッドの実装は通常のJavaの Cloneableによる実装とは異なりClass#newInstanceメソッドを用いて実装されています。
以下にComponentクラスのcloneメソッドの実装の一部を以下に示します。
public Object clone() { Component ret = null; try { ret = (Component)getClass().newInstance(); } catch (IllegalAccessException e) { } catch (InstantiationException e) { } ret.setName(getName()); int cnt = ret.getPropertyCount(); for (int i=0; i < cnt; i++) { Property prop = ret.getProperty(i); prop.assign(getProperty(i)); } ... return ret; }
何故このような実装になっているかと言うと複製を作成する際にもComponentのコンストラクタを動かしたいからです。
そしてインスタンス作成後に登録されているプロパティの値をコピーすることで複製を作成します。
逆から言えば登録されているプロパティ以外のメンバー変数はコピーされません。
拡張コンパイラを使用してコンパイル時にインスタンス変数に何らかの値を設定した場合はComponent#clone
メソッドをオーバーライドして、そのコピーを自分で行う必要があります。
(cloneメソッド内では、super.clone()を必ず呼び出す必要があります。)
フロー内の各コンポーネントのインスタンスが別になっているため、通常コンポーネント開発者は スレッドセーフについてほとんど考える必要がありませんが、コンパイラ拡張によってメンバー変数でオブジェクトを 共有する場合はそのスレッドセーフ性についてはコンポーネント開発者が担保しなければなりません。
SDKで新たに公開されたIndependentMapやComponentCompilerを用いたサンプルとしてフローの同時起動数を制限する
コンポーネントのソースを以下に示します。
このソースについて詳しい解説は行いませんが、このコンポーネントはチュートリアルで作成したコンポーネントと
共にfcsample.jarに含まれていますので自分で改良しながら動作を確認してみてください。
package com.infoteria.sample.component; import com.infoteria.asteria.flowengine2.FlowEngine; import com.infoteria.asteria.flowengine2.execute.ExecuteContext; import com.infoteria.asteria.flowengine2.execute.Finalizer; import com.infoteria.asteria.flowengine2.execute.Releasable; import com.infoteria.asteria.flowengine2.flow.Component; import com.infoteria.asteria.flowengine2.flow.ComponentCompiler; import com.infoteria.asteria.flowlibrary2.component.ComponentExceptionByMessageCode; import com.infoteria.asteria.flowlibrary2.component.SimpleComponent; import com.infoteria.asteria.flowlibrary2.FlowException; import com.infoteria.asteria.flowlibrary2.property.ExceptionProperty; import com.infoteria.asteria.flowlibrary2.property.IntegerProperty; import com.infoteria.asteria.flowlibrary2.property.StringProperty; import com.infoteria.asteria.flowlibrary2.stream.StreamType; import java.util.Map; import java.util.HashMap; /** * フローの同時実行数を制御するコンポーネント<br/> * 同じIDを持つフローを同時に指定の個数までしか実行できなくする */ public class ThreadControlComponent extends SimpleComponent { private static ThreadControl __control; private static final String COMPONENT_DUPLICATE = "1"; private static final String LIMIT_OVER = "2"; private static final String INVALID_LIMIT = "3"; private static final String CURRENT_COUNT = "4"; private static final int STATE_LIMIT_OVER = 1; public static final String COMPONENT_NAME = "ThreadControl"; public String getComponentName() { return COMPONENT_NAME;} private StringProperty _idProp = new StringProperty("ID", false, false); private IntegerProperty _limitProp = new IntegerProperty("Limit", true, false, 1); private ExceptionProperty _limitOverProp = new ExceptionProperty("LimitOverException"); public ThreadControlComponent() { super(StreamType.ALL, StreamType.ALL); registProperty(_idProp); registProperty(_limitProp); registProperty(_limitOverProp); } public boolean execute(ExecuteContext context) throws FlowException { //ThreadControlのID String id = _idProp.strValue(); if (id.length() == 0) id = getOwnerFlow().getFullName(); int limit = _limitProp.intValue(); //IndependentMapのキー String key = getClass().getName(); ThreadControlDecrement dec = (ThreadControlDecrement)context.getIndependentMap().get(key); if (dec != null) throw new ComponentExceptionByMessageCode(this, COMPONENT_DUPLICATE);//このコンポーネントはリクエスト中に複数配置できません int n = __control.inc(id, limit); if (n < 0) throw new ComponentExceptionByMessageCode(this, LIMIT_OVER, STATE_LIMIT_OVER);//同時起動数をオーバーしました context.info(getMessage(CURRENT_COUNT, Integer.valueOf(n))); context.getIndependentMap().put(key, new ThreadControlDecrement(id)); passStream(); return true; } /** * コンパイル時にLimitプロパティの値をチェックして0以下ならばエラーとする */ protected ComponentCompiler getCompiler() { return new MyCompiler(this); } private class MyCompiler extends ComponentCompiler { public MyCompiler(Component c) { super(c); } protected void postCompile(Map componentMap) { int limit = _limitProp.intValue(); if (limit <= 0) { String msg = getMessage(INVALID_LIMIT); onComponentError(msg); } } } //初期化時にstaticなThreadControlオブジェクトを作成しFinalizerとして登録 public void internalInit() { __control = new ThreadControl(); FlowEngine.addFinalizer(__control); } private static class ThreadControl implements Finalizer { private Map _map = new HashMap(); public synchronized int inc(String id, int limit) { Integer n = (Integer)_map.get(id); if (n == null) { _map.put(id, Integer.valueOf(1)); return 1; } if (n.intValue() >= limit) return -1; n = Integer.valueOf(n.intValue() + 1); _map.put(id, n); return n.intValue(); } public synchronized int dec(String id) { Integer n = (Integer)_map.get(id); if (n == null) return 0; if (n.intValue() == 1) _map.remove(id); else _map.put(id, Integer.valueOf(n.intValue() - 1)); return n.intValue() - 1; } /** Finalizerインターフェース */ public synchronized void release() { _map.clear(); } } private class ThreadControlDecrement implements Releasable { private String _id; public ThreadControlDecrement(String id) { _id = id; } public void release(ExecuteContext context) { int n = __control.dec(_id); if (context.isDebugInfoEnabled()) { String msg = getMessage(CURRENT_COUNT, Integer.valueOf(n)); context.debugInfo(msg); } } } }
ComponentInvokerは既存のコンポーネントを独自コンポーネント内部から実行するためのユーティリティクラスです。
既存のコンポーネントインスタンスを取得し、Javaコードからプロパティ設定を行った上で実行させることができます。
ただし、実行可能なのは単純なProperty/SimpleCategoryPropertyのみから構成されているコンポーネントのみであり、
拡張コンパイラによってコンパイル時に特別な処理を行っているコンポーネントは実行することができません。
(ExcelコンポーネントやRDBPutコンポーネントなどがこれにあたります。)
以下にHttpGetコンポーネント、RDBGetコンポーネント、SimpleMailコンポーネントをComponentInvokerで実行するサンプルを示します。
以下のサンプルコードはJavaInterpreterコンポーネント上で実行可能です。
//HttpGetコンポーネントの実行 import com.infoteria.asteria.flowengine2.FlowEngine; import com.infoteria.asteria.flowengine2.execute.ComponentInvoker; import com.infoteria.asteria.flowengine2.flow.Component; import com.infoteria.asteria.flowlibrary2.stream.StreamFactory; import com.infoteria.asteria.flowlibrary2.stream.StreamType; String url = "http://www.yahoo.co.jp/"; Component c = FlowEngine.getComponentManager().getComponentInstance("HTTP(Get)"); ComponentInvoker invoker = new ComponentInvoker(c); invoker.setPropertyBoolean("UseConnection", false); invoker.setPropertyString("URL", url); StreamFactory sf = StreamFactory.getInstance(StreamType.MIME); invoker.setStreamFactory(sf); return invoker.execute(context, component.getInputConnector().getStream());
//RDBGetコンポーネントの実行 import com.infoteria.asteria.flowengine2.FlowEngine; import com.infoteria.asteria.flowengine2.execute.ComponentInvoker; import com.infoteria.asteria.flowengine2.flow.Component; import com.infoteria.asteria.flowlibrary2.stream.StreamFactory; import com.infoteria.asteria.flowlibrary2.stream.StreamType; import com.infoteria.asteria.flowlibrary2.stream.FieldDefinitionBuilder; import com.infoteria.asteria.flowlibrary2.stream.FieldType; import com.infoteria.asteria.value.Value; String connection = "RDB1"; String sql = "SELECT COL1, COL2, COL3 FROM TABLE1 WHERE COL1 = ?p1?"; Component c = FlowEngine.getComponentManager().getComponentInstance("RDB(Get)"); ComponentInvoker invoker = new ComponentInvoker(c); invoker.setPropertyString("Connection", connection); invoker.setPropertyString("SQL", sql); invoker.setCategory("SQLParameter", "p1", new Value("ABC")); StreamFactory sf = StreamFactory.getInstance(StreamType.RECORDS); FieldDefinitionBuilder builder = new FieldDefinitionBuilder(); builder.addField("field1", FieldType.STRING); builder.addField("field2", FieldType.STRING); builder.addField("field3", FieldType.STRING); sf.setFieldDefinition(builder.getDefinition()); invoker.setStreamFactory(sf); return invoker.execute(context, component.getInputConnector().getStream());
//SimpleMailコンポーネントの実行 import com.infoteria.asteria.flowengine2.FlowEngine; import com.infoteria.asteria.flowengine2.execute.ComponentInvoker; import com.infoteria.asteria.flowengine2.flow.Component; import com.infoteria.asteria.flowlibrary2.stream.StreamFactory; import com.infoteria.asteria.flowlibrary2.stream.StreamType; import com.infoteria.asteria.flowlibrary2.stream.FieldDefinitionBuilder; import com.infoteria.asteria.flowlibrary2.stream.FieldType; import com.infoteria.asteria.value.Value; String connection = "smtp1"; String from = "user1@xxxx.co.jp"; String to = "user1@xxxx.co.jp"; String subject = "てすと"; Component c = FlowEngine.getComponentManager().getComponentInstance("SimpleMail"); ComponentInvoker invoker = new ComponentInvoker(c); invoker.setPropertyString("Connection", connection); invoker.setPropertyString("From", from); invoker.setPropertyString("To", to); invoker.setPropertyString("Subject", subject); return invoker.execute(context, component.getInputConnector().getStream());
上記のサンプルでは単純に1コンポーネントを実行してその出力ストリームを返しているだけですが、この技術を使用すれば複数のコンポーネントの実行を組み合わせたものを 1コンポーネントにまとめることも可能です。
サンプルコードで概ねやっていることはわかると思いますが、実際のところはComponentInvokerを使用するためには
実行対象のコンポーネントについてプロパティ名などの内部仕様まで知っている必要があります。(必要な情報はほとんどコンポーネント定義ファイル(xscファイル)より得ることができます。)
またプロパティ設定が不適切だった場合にどのような動作になるかはそのコンポーネントの実装次第になるのでエラーメッセージが不親切だったりデバッグが難しい場合があります。
将来のバージョンでの仕様変更によって実行しているコンポーネントが拡張コンパイラを必要とする形に変更される可能性も0ではありません。
ご使用の際にはできるだけ個別にご相談ください。
その他サンプルとして標準で提供されているコンポーネントのいくつかのソースを公開します。