2022年12月17日に開催された「R研究集会2022」の発表資料です。 当日の投影内容から、メモリの消費量の算出方法に誤りがあったため一部内容を変更しています。 リポジトリ: h…
はじめに Pyspark(Spark MLlib)を用いた機械学習の一連の流れに関する実装を整理する。(scikit-learnはよく見かけるけどPysparkはあんまり見かけない。。。。) そのため、機械学習自体の中身については触れないし、自身の能力としても触れられない。 概要 SparkのMLlibにおいて機械学習の一連のワークフローを構成する要素は次の3個になる。これらの構成要素を用いて、前処理や学習を実装する。 Transformers Dataframeを入力とし、1個以上のカラムを追加したDataframeを出力する。(メソッドはtransform()) 入出力の処理は変換処理として定義されたもの(つまりはルールベース)が行わる。 例えば 複数カラムの特徴量を1カラムのベクトル化する(VectorAssemler) 学習済みモデルのTransformerでテスト用データを入力と
データウェアハウス開発部の高野です。現在はオンプレミスの電子カルテデータ基盤のAWS移行のプロジェクトに参画しています。 今年、JMDCではアドベントカレンダーに参加しています。 qiita.com 本記事は、JMDC Advent Calendar 2024 7日目の記事です。 はじめに 電子カルテデータ基盤のAWS移行を進めている中、オンプレミスの旧データ基盤のデータ移行が要件の1つとしてありました。AWSでは主なデータベースとしてAmazon Redshift Serverlessを採用しており、そちらに移行データを連携したい、データ移行に必要なデータ形式が様々だったことからデータ移行はAWS Glueジョブ(PySpark)を使って対応しました。データウェアハウス開発部ではSQLでのデータ変換が主流ですが、今回AWS Glueジョブ(PySpark)を使って良かった点について書いて
こんにちは。LINEヤフー株式会社ビジネスPF開発本部で LINE DMP の開発を担当している yamaguchi です。 この記事は、Testcontainers を活用して Apache Kyuubi を用いたユニットテスト環境をどのように構築したかを紹介します。 はじめに LINE DMP(Data Management Platform)は LINE 外部から同意を得てアップロードされた、あるいは LINE の内部で得られたデータをさまざまな形で ETL 処理をし、LINE広告やLINE公式アカウントのような B2B サービスで活用できるようにするためのプロダクトです。 膨大な累積データや非常に大きなトラフィックを扱うこともあり、リアルタイム処理から大規模バッチ処理までさまざまな LINE にまつわるデータ処理を行っています。 プロジェクトの背景 大規模データを取り扱うにあたり、
概要 少し前に{sparklyr}というRからSparkを使うパッケージがRStudio社から公開されました。この{sparklyr}にはS3上のファイルも読み込めるspark_read_csvという関数が提供されており、Amazon Athenaが東京リージョンに来るまで代わりに使えないかと試してみました。 今回はAWS Public Datasetsにあるデータセットを読み込んでみましたが、入力対象のS3バケットに権限があれば同じように扱えると思います。 sparklyr: R interface for Apache Spark 事前準備 {sparklyr}の活用にあたって対象パッケージのインストールと、Spark環境の設定が必要になります。後者については{sparklyr}に関数が用意されているので、今回はそれを使用してローカルに環境構築します。 今回は試しませんが、ローカルでは
はじめに 業務で行っていたとある分析で、大量の確率モデルの確率密度関数を積分して確率を求める機会があったので、この記事ではその際に調べたことを、架空の問題設定を通して紹介していこうと思います。 問題設定 今回はダミーのデータセットを用いて説明しようと思います。 ECサイトのユーザの毎月の決済回数を集計して、平均と標準偏差を算出したという体で、以下のようにしてNumpyとPandasでダミーデータを作成します。 rg = np.random.default_rng() models = [] for i in range(100000): id = str(i) usage = rg.uniform(0, 30, 12) # 各ユーザの12か月の利用回数を0~30の一様分布で生成 mean = float(np.mean(usage)) std = float(np.std(usage))
Sparkで動くアプリケーションをPythonで書いたので、pytestでテストしたい! 大規模データでもテストしたいので、YARNクラスタにも投げたい! ある意味 pytestに入門してみたメモ の続編です。 pytestプラグインとかあるみたいですが 今回は手の内が分かったうえで自分でいろいろやりたかったので、前回の復習も兼ねて自前で作ってみることにします。 spark-submitコマンドを使わずにSparkにアクセス Spark向けに書いたスクリプトを実行する時、普通はspark-submitコマンドを使うと思いますが、今回はpytest経由で実行したいので、spark-submitを使わずに普通のPythonからpysparkモジュールを呼びたいわけです。 そのためには、本来spark-submitがやっている諸々の設定を自分でやればいいはず。spark-submitの処理を追い
概要 spark用のテストライブラリのspark-testing-baseをPySparkでどのように使うかを軽く調べてみた。 調べたこと unittest2のTestCaseを継承しており、unittest2.main()でテスト実行できる クラス内部でspark contextを立ち上げており、テストでわざわざ自力で立ち上げる必要がない 自分でspark contextを立ち上げていると、テスト実行時にエラーになる DataFrameの場合、スキーマと件数と各レコードの並びが一致するかを確認しているみたい from sparktestingbase.sqltestcase import SQLTestCase import unittest2 class DataFrameTest(SQLTestCase): def test_expected_equal(self): data =
久しぶりのLLM以外の記事ですが、どちらかといえばこちらが本業です。 導入 Apache Spark 3.5がリリースされました。 下記のDatabricks公式blogでも取り上げられています。 これを読んでいていると、PysparkのTesting APIというものに目が引かれました。 類似のモジュールは既にあったと思うのですが、公式が出してくれると地味に捗ります。 というわけで、下のドキュメントを基に、ウォークスルーしてみました。 環境はいつものようにDatabricksを使います。DBRは14.0です。 Step1. ビルトイン関数を試す ドキュメントの内容ほぼそのままを実行してきます。 最初はデータフレーム同士の比較を行う関数assertDataFrameEqualでテストします。 import pyspark.testing from pyspark.testing.utils
セッションの要約 Apache Spark 4.0のリリースが近づいており、新機能やバグ修正によりユーザーエクスペリエンスが向上します。注目点はANSIモードとデータハンドリングの改善で、Sparknet GAの導入により多言語サポートが強化され、軽量なクライアントライブラリ「spark-connect」が登場します。文字列照合機能とストリーミングの強化により、データセットの操作が直感的になります。さらに、APIとUDFの改善、プロファイリングやログ記録機能の強化が開発者の効率を向上させます。 Apache Spark 4.0:ANSIモードとデータハンドリングの強化 Apache Spark 4.0のリリースが近づいており、開発者体験が大幅に改善することが予想されています。主な焦点は、ANSIモードとデータハンドリングの顕著な改善に基づいています。 Spark Connect がGAに
本記事執筆時点ではパブリックプレビューですが、 Azure Synapse Analytics の Apache Spark プール では Python (PySpark), Scala, C# (.NET Spark), SQL だけでなく、R言語 (SparkR) も使用することができます。Apache Spark のバージョンとしては 3 以上で利用でき、本記事執筆時点で利用可能なバージョンは 3.1, 3.2, 3.3 (プレビュー) の 3 種類です。 Apache Spark プールで R のパッケージを利用する方法は 2 通りです。 ノートブック セッション パッケージ ワークスペース パッケージ 前者の方が一時的なもので、後者の方が恒久的なものです。 なお、Python の場合は上記以外に Spark プール パッケージ という方法もあり、上記の中だと中間に位置します。 ノ
Shapeless を使ったタイプフルな Spark ライブラリ Frameless について。 はじめに Cats エコシステムのリストを見ると、Frameless なるものが含まれていて、"Expressive types for Spark" とある。個人的に最近 Spark を勉強しているところでもあったので、ちょっと調べてみた。 Frameless 概要 Spark のためのタイプフルな Scala ライブラリで次のような特徴がある。 Shapeless を活用して型安全性に Dataset をラップした TypedDataset が提供される。 カラム指定が型安全。文字列型の列名による指定と違って、コンパイル時にカラムの存在がチェックされる。 エンコーダーの有無や組み込み関数の型の整合性もコンパイル時チェックされる。 キャストや射影((A, B) => (A) とか)なども型安
この記事について Sparkは大規模データを高速に処理できるメリットがある一方で、pandasに比べるとまだまだ柔軟な処理ができるとは言い難い現状です。そこで、Sparkに実装されていない関数については、UDFを利用することがありますが、パフォーマンスが決して良いとは言えない状況です。 そこで、spark 2.3.0から登場したpandas UDFを使うと、高速かつ柔軟にデータを処理することができます。UDFを含めた概要についてはこちらの記事も見てみてください。 pandas UDFの概要 pandasUDFの基本的な構成は pandas.Series もしくは pandas.DataFrame を受け付け、出力値として同様にpandas.Series もしくは pandas.DataFrame を返すような形で記述することができます。 pandasUDFには、 SCALER , GROU
細かすぎるファイルはパフォーマンス的によろしく無いですし、Athenaでのクエリ実行時などでtoo many open filesといったエラーが出やすくなってしまいます。 そこで出力するファイルサイズをもとに書き込むファイル数をいい感じにコントロールするオプションを探しました。しかしそんな方法はなさそうでした。。。 代わりに書き込むファイル数(厳密にはデータフレームの並列数?)を指定するrepartitionというメソッドがありました。 このメソッドを使えば出力されるファイル数は指定できるけど、データサイズに応じて数を調整できないと、データ量の増加に応じていちいちジョブをチューニングしなければいけなくなります。 解決策 ということで、以下のようなs3上のデータサイズを取得する関数と適切なファイル数を計算する関数を作って適切なpartition数を事前計算するようにしました。 def fe
はじめに Sparkを使っていて以下のような辛さがありました。 Hyper Parameter を少しずつ変えてチューニングしたいだけなのに、そこに至るデータ処理に時間がかかりすぎて何度もトライするのが辛い。 spark-shell でもまぁデバッグできなくは無いんだけど、デバッグしたい箇所に至るまでのステップをshellに打ち込むのが面倒。 あぁ Spark でも break point 貼れたらなぁ… この記事はそんな辛さを解消する記事です。 まとめ 簡単な話なので先にまとめると、 spark-submit 時に SPARK_SUBMIT_OPTS に jdwp の設定を入れて起動 Intellij の Remote Debugger 機能で break point を貼る と言うものです。 おことわり この記事は Intellij で Debug する方法を書いていますが、eclip
KAKEHASHI の、Musubi Insight チームのエンジニアの横田です。 KAKEHASHI では BI ツールの Musubi Insight という Web アプリケーションを提供しています。 BI ツールでは薬剤師さんの業務データを可視化しておりますが、そのデータの集計処理には AWS Glue を使っています。 今年 AWS Glue 3.0が使えるようになり、できることが増えました。 チームのデータ基盤の概要と、AWS Glue 3.0 になって新たに使えるようになった PySpark の関数をいくつか紹介していきます。 Musubi Insight チームでの AWS Glue の利用について まず、簡単にデータ基盤の概要について紹介します。 弊社では AWS を利用しサービスを提供しているのですが、各サービスで作られたデータは S3 上に集まってくるようになってい
リリース、障害情報などのサービスのお知らせ
最新の人気エントリーの配信
処理を実行中です
j次のブックマーク
k前のブックマーク
lあとで読む
eコメント一覧を開く
oページを開く