日々地道に成長

思いついたことを書いていきます。

【Embulk BigQuery】embulk-input-bigqueryとembulk-output-bigqueryでのエラーについて

お疲れ様です。コーヒーです。

 

今回はembulk-input-bigqueryとembulk-output-bigqueryでのエラーについてまとめてみます。

プラグインのURLは以下になります。

github.com

github.com

 

上記プラグイン使用時に不定期で起こるエラーに以下のものがあります。

org.embulk.config.ConfigException: OutputPlugin 'bigquery' is not found.
org.jruby.proxy.org.embulk.config.ConfigException$Proxy1: Unknown output plugin 'bigquery'. embulk/output/bigquery.rb is not installed. Run 'embulk gem search -rd embulk-output' command to find plugins.

こちらのエラーが出現時は基本的には再実行で対処は可能です。

しかし、定期的に実行する場合などはそのままではあまりよくありません。

また、上記のプラグイン使用時にデータ量によってはCPU/メモリ使用率が大幅に上がってしまいます。

エラー対処法

対処法としてはGCSを一度経由する方法があります。

流れとしては以下のようになります。

  1. embulk-input-gcsもしくはembulk-output-gcsを使用しデータを転送
  2. bqコマンド等でGCSからデータの読み込み

使用するプラグインは以下になります。

github.com

github.com

 

上記エラーやCPU使用率を改善したい場合は遠回りになりますが上記方法を試してみてもよいと思います!

今回の内容は以上となります。

最後まで読んでいただきありがとうございました。

【Digdag BigQuery】Digdagのbqコマンドを使用してクエリを実行する方法

お疲れ様です。コーヒーです。

 

今回はDigdagでのBigQueryの操作についてまとめていこうと思います。

DigdagでBigQueryを操作する方法は主に下記の3つがあります。

  1. DigdagのshコマンドからBigQueryのbqコマンドを使用する
  2. Digdagのbqコマンドを使用する

今回は2つ目のbqコマンドを使用する方法について解説していきます。

Digdagのbqコマンド概要について

Digdagのbqコマンドのリファレンスは以下になります。

docs.digdag.io

まず、こちらの方法について前提としてBigQueryの[location]が[asia-northeast1]の方は使用できません

理由として、現在(2021/03/31)はlocationの指定がオプションでできないためです。

そのため、[location]が[asia-northeast1](正確にはlocationの指定が必要な場合)は別の方法の使用を検討してください。

他の方法は以下のリンクで紹介しているため、よければ確認してみてください。

 

stady-diary.hatenablog.com

 

bqコマンドの実行方法

コードの使用方法としては以下のように使用します。

bq>: queries/step1.sql

[bq >]の形でsqlを指定して使用できます。

また以下のようにdigdag配下にディレクトリを設定して使用してください。
└── digdag
 ├── xxx.dig
 └── queries
  |── xxx.sql

digdag secretsの設定

Google BigqueryのJSON形式の秘密鍵をdigdag secretsの[gcp.credential]に設定しなければbqコマンドを使用できません。

Digdag シークレットのlocalに設定する場合は以下のように設定行ってください。

cat [service account.json] |digdag secrets --local --set gcp.credential=@[service account.json]

 

 

今回は以上になります。

最後まで読んでいただきありがとうございました。

【BigQuery】An internal error occurred and the request could not be completed」エラーについて&digdagのretry機能

お疲れ様です。コーヒーです。

 

今回はBigQueryの「An internal error occurred and the request could not be completed」エラーについてと、その対処法についてです。

エラーの状況について

  • digdag+embulkでbqのクエリを実行時発生
  • 発生タイミングは不定
  • 再実行時にはエラーが出ない

上記のような状況でエラーが起こっていました。

 

こちらのエラーについて調査したところ以下のような状況でした。

  • BQの内部エラーで起こる
  • 原因は現状不明
  • 再実行で解決可能

ただ毎回再実行しなくてはいけないのでは、定期的に動かすクエリとしては問題があります。

digdagの_retryオプション

今回はこの問題をdigdagのretryオプションで解決しました。

公式の文は以下になります。

docs.digdag.io

使用方法は簡単で、エラーが起きてしまった時にやり直ししてほしいタスクに[_retry]を設定するだけです。

+bq:
  _retry: 3

  +step1:
    sh>: bq query --use_legacy_sql=false < xxx.sq

上記はエラー時に3回までリトライするものになります。

 

ただこれだけだと、連続でクエリをやりなすのでエラーが連続で起きてしまう可能性があるため、以下のようにクエリをリトライするまでのインターバルを設定することで動作を安定させられます。

+bq:
  _retry:
    limit: 3
    interval: 60
    interval_type: exponential

  +step1:
    sh>: bq query --use_legacy_sql=false < xxx.sql

こちらはstep1タスクがエラー時に60秒間をあけて再実行するものになります。

[interval_type]をexponentialに設定すると、再実行までの間隔を失敗するごとに増やしてくれるものになります。

 

以上でbqのエラーをdigdagのオプションで解決する手段でした。

最後まで読んでいただきありがとうございました。

【Digdag Embulk】Digdag+Embulkで変数を使用してコードを分かりやすく変える【基礎】

お疲れ様です。コーヒーです。

 

今回はDigdag+Embulkをで変数を使用して、分かりやすく、使いまわしやすく、修正しやすいプログラムを目指していこうと思います。

前回作成した処理を変数を使用したものに変更していこうと思いますので、よければそちらも確認してみてください。

stady-diary.hatenablog.com

Digdagの変数について

Digdagで変数を定義するときには、主に「_export」で行います。

以下のように設定することができます。

_export:
  bq_json_keyfile: xxx.json
  bq_project: project_name
  path_prefix: /data/save/test.
  s3_bucket_name: s3_bucket_name
  s3_dir: s3://${s3_bucket_name}

よく使うであろうものを今回は変数として設定しています。

さらにこの情報を、config.digファイルという設定だけをまとめておくファイルを作って読み込むようにすればより汎用性が上がります。

 

_export:
  !include : config/config.dig

上記のような感じにすっきりまとめられます。

他のdigファイルから読み込むときは[!include]を使用します。

 

そして前回作成したdigファイルに反映すると以下になります。

+main:
#BQからEmbulkでcsvファイルを出力
  +BQ_output_csv:
    sh>: embulk run example.yml.liquid

#出力したファイルをS3に転送
  +s3_transfer:
    sh>: /usr/bin/aws s3 cp /data/save/test.csv ${s3_dir}/xxx/

digファイル上で変数を使用するには${}で使用できます。

digdagからembulkへの変数の受け渡し

digdagで先ほど定義した変数をembulkで使用する方法について説明します。

まず気を付けなければならないのが、embulkの拡張子を[.yml.liquid]に変更しておいてください。こちらの設定をしていないとうまく動作しません。

 

実際にembulkファイルに変数を受け渡したコードが以下になります。

in:
  type: bigquery
  keyfile: {{env.bq_json_keyfile}}
  project: {{env.bq_project}}
  location: asia-northeast1
  sql: |
    select
        *
    from
        xxx.xxx
out:
  type: file
  file_ext: csv
  path_prefix: {{env.path_prefix}}
  sequence_format: ''
  formatter:
    type: csv
    charset: Shift_JIS
    header_line : true
    newline: CRLF
    quote_policy: NONE

digdagの[_export]で設定した変数を、embulk上で{{env.xxx}}で受け取ります。

 

これで変数を使用しての、コード作成方法は以上になります。

最後まで読んでいただきありがとうございます!!

【Digdag Embulk】Digdag+Embulkを使用した処理方法【基礎】

お疲れ様です。コーヒーです。

 

今回はDigdag+Embulkを使用した、処理を行う具体例について解説していきます!

DigdagとEmbulkそれぞれについて、解説している記事もあるので合わせて読んでいただけると理解が深まると思います。

stady-diary.hatenablog.com

stady-diary.hatenablog.com

 

Digdag+Embulkを使用した処理の具体的な流れ

今回は以下の流れで処理を行っていきます。

  1. BigQueryからcsvファイルを出力(Embulk使用)
  2. 出力したファイルをS3に転送(s3コマンド)

上記のようにシンプルな流れで処理をおこないます。

全体的なプログラムを先に見せると以下のような感じになります。

timezone: 'Asia/Tokyo'

+main:
#BQからEmbulkでcsvファイルを出力
  +BQ_output_csv:
    sh>: embulk run example.yml.liquid

#出力したファイルをS3に転送
  +s3_transfer:
    sh>: /usr/bin/aws s3 cp /data/save/ s3://s3_bucket_name/xxx/

BigQueryからcsvファイルを出力(Embulk使用)

下記の部分にてEmbulkでexample.yml.liquidを実行してcsvファイルを出力しています。

#BQからEmbulkでcsvファイルを出力
  +BQ_output_csv:
    sh>: embulk run example.yml.liquid

ここで、example.yml.liquidの中身を見ていきます。

in:
  type: bigquery
  keyfile: xxx.json
  project: project_name
  location: asia-northeast1
  sql: |
    select
        *
    from
        xxx.xxx
out:
  type: file
  file_ext: csv
  path_prefix: /data/save/test.
  sequence_format: ''
  formatter:
    type: csv
    charset: Shift_JIS
    header_line : true
    newline: CRLF
    quote_policy: NONE

inではBigQueryの設定をおこなっています。

プラグインは[embulk-input-bigquery]を使用しています。

github.com

こちらのインストール方法は簡単で以下のコマンドを打ち込むだけです!

embulk gem install embulk-input-bigquery

各種設定はそれぞれの環境に合わせて書き直してください。

ただ、locationはデフォルトでは[US or EU]になっているので注意してください。

今回はsqlはxxxテーブルから全件抜き出すものになっていますが、こちらもsqlを張り付ければどんなものにも対応しています。

 

outでは自身のローカル環境への出力設定を行っています。

path_prefixに保存したい場所のURLを指定できます。

その他細かい設定等はそれぞれの状況に合わせてください。

出力したファイルをS3に転送(s3コマンド)

 以下の部分でローカル環境からs3にファイルコピーしています。

#出力したファイルをS3に転送
  +s3_transfer:
    sh>: /usr/bin/aws s3 cp /data/save/ s3://s3_bucket_name/xxx/

「sh>」オペレーターを使用して、S3コマンドを呼び出していました。

「sh>」オペレーターは、シェルスクリプトを実行してくれます。

S3コマンドについては以下を参照してください。

docs.aws.amazon.com

 

以上で今回の解説は終了です!

実はembulkから直接s3に送ることもできたりと、プラグインをいろいろと調べると便利なものがいっぱいあります。

それでは!

【Digdag】Digdag入門講座~インストール編~【初心者】

お疲れ様です。コーヒーです。

 

今回はDigdagについて、インストール方法ついて解説していきます。

Digdagの基本的な概念については前回説明しています。

stady-diary.hatenablog.com

Digdagのインストール

digdagのインストールについては、以下リンクの公式サイトの手順通りに行えばできます!

ですが、英語サイトのため詳しく説明いたします!

Getting started — Digdag 0.9.42 documentation

 linux/macでのインストール

以下コマンドを打つことでインストールが可能です。

#digdagのファイルをダウンロード
$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
#パーミッションの変更 $ chmod +x ~/bin/digdag
#pathの追加 $ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc

次に、上記で設定したpathの変更を有効にするため以下のコマンドを入力します。

もしくは、ターミナルを再度開くでも問題ありません。

$ source ~/.bashrc

最後にインストールされているかを確認するために、以下のコマンドを実行してください。このコマンドでバージョンが表示されれば問題なくインストールが行えています。

digdag --version
 Windowsでのインストール

windowsではコマンドプロンプト及びpowershellで以下コマンドを実行することでインストールが行えます。

PowerShell -Command "& {[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::TLS12; mkdir -Force $env:USERPROFILE\bin; Invoke-WebRequest http://dl.digdag.io/digdag-latest.jar -OutFile $env:USERPROFILE\bin\digdag.bat}"

上記コマンドを入力することで「C:\Users\YOUR_NAME\bin」「digdag.bat」がダウンロードされていることが確認できると思います!

 

 確認が行えたら次に以下のコマンドでpathの設定を行います。

setx PATH "%PATH%;%USERPROFILE%\bin"

上記コマンド入力が行えたら、再度ターミナルもしくはPowershell起動し直してください。

その後、以下コマンドを入力してインストールが上手くいっているか確認してください。バージョンが表示されていれば成功です!!

digdag --version
  

今回の内容は以上となります!

次回はDigdagの使用方法について詳しく説明したいと思います!

【Digdag】Digdag入門講座~概要編~【初心者】

お疲れ様です。コーヒーです。

 

今回はDigdagについて、初心者による初心者向け解説をしてみようと思います。

Digdagの基本的な概念について説明していきます!!

 

Digdagとは

Digdagとは、いろんな操作を定義に従い自動実行してくれるワークフローエンジンと呼ばれるものです。

ワークフローエンジンには他にも以下のようなものがあります。

  • Airflow
  • Argo

Digdagの特徴としては以下のようなものがあります

  • 開発が容易である
  • tresure data社が開発したため、tresure dataとのコネクタが充実している
  • グループによりタスクを管理できる
  • プラグインによる拡張性
  • エラー時にエラー箇所から再実行可能
  • 様々なクラウドに対応(S3等)

Digdagの基本的な流れ

Digdagでのワークフローを定義する基本的な流れを説明します。

  1. [.dig]ファイルの作成
  2. [timezone]の設定
  3. [_export]でパラメータを設定
  4. [+]タスクの定義
  5. オペレータの使用

では、実際のDigdagを見つつ上記の流れを確認しましょう!

#timezonの設定
timezone: Asia/Tokyo
#パラメータの設定 _export: var: "digdag"
#タスクの定義 +task: echo>: start ${var}#オペレータの使用

上記のプログラムは[start digdag]と表示するだけのものとなっています。

今は流れだけ掴んでください!!

 

次回、digdagのインストールと詳しい使用方法について説明していきます!

最後まで読んでいただきありがとうございます!!