PySpark Pandas_Udf()

Pyspark Pandas Udf



Có thể chuyển đổi Khung dữ liệu PySpark bằng hàm pandas_udf(). Đây là một chức năng do người dùng xác định được áp dụng trên Khung dữ liệu PySpark bằng mũi tên. Chúng ta có thể thực hiện các thao tác vector hóa bằng cách sử dụng pandas_udf(). Nó có thể được thực hiện bằng cách chuyển chức năng này như một công cụ trang trí. Hãy đi sâu vào hướng dẫn này để biết cú pháp, tham số và các ví dụ khác nhau.

Nội dung chủ đề:

Nếu bạn muốn biết về cài đặt mô-đun và Khung dữ liệu PySpark, hãy xem phần này bài báo .







Pyspark.sql.functions.pandas_udf()

pandas_udf() có sẵn trong mô-đun sql.functions trong PySpark, có thể được nhập bằng cách sử dụng từ khóa “từ”. Nó được sử dụng để thực hiện các hoạt động được vector hóa trên Khung dữ liệu PySpark của chúng tôi. Chức năng này được thực hiện giống như một công cụ trang trí bằng cách truyền ba tham số. Sau đó, chúng ta có thể tạo một hàm do người dùng định nghĩa để trả về dữ liệu ở định dạng vectơ (giống như chúng ta sử dụng chuỗi/NumPy cho điều này) bằng cách sử dụng một mũi tên. Trong chức năng này, chúng tôi có thể trả về kết quả.



Cấu trúc & Cú pháp:



Đầu tiên, hãy xem cấu trúc và cú pháp của hàm này:

@pandas_udf(kiểu dữ liệu)
def function_name(thao tác) -> convert_format:
tuyên bố trở lại

Ở đây, function_name là tên của hàm đã xác định của chúng ta. Kiểu dữ liệu chỉ định kiểu dữ liệu được trả về bởi hàm này. Chúng tôi có thể trả về kết quả bằng cách sử dụng từ khóa “return”. Tất cả các hoạt động được thực hiện bên trong chức năng với việc gán mũi tên.





Pandas_udf (Hàm và Kiểu trả về)

  1. Tham số đầu tiên là hàm do người dùng định nghĩa được truyền cho nó.
  2. Tham số thứ hai được sử dụng để chỉ định kiểu dữ liệu trả về từ hàm.

Dữ liệu:

Trong toàn bộ hướng dẫn này, chúng tôi chỉ sử dụng một Khung dữ liệu PySpark để trình diễn. Tất cả các hàm do người dùng xác định mà chúng tôi xác định đều được áp dụng trên Khung dữ liệu PySpark này. Đảm bảo rằng bạn tạo DataFrame này trong môi trường của mình trước sau khi cài đặt PySpark.



nhập pyspark

từ pyspark.sql nhập SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Gợi ý Linux' ).getOrCreate()

từ pyspark.sql.functions nhập pandas_udf

từ pyspark.sql.types nhập *

nhập gấu trúc dưới dạng gấu trúc

#chi tiết rau

rau =[{ 'kiểu' : 'rau quả' , 'tên' : 'cà chua' , 'xác định_quốc gia' : 'HOA KỲ' , 'Số lượng' : 800 },

{ 'kiểu' : 'hoa quả' , 'tên' : 'chuối' , 'xác định_quốc gia' : 'TRUNG QUỐC' , 'Số lượng' : hai mươi },

{ 'kiểu' : 'rau quả' , 'tên' : 'cà chua' , 'xác định_quốc gia' : 'HOA KỲ' , 'Số lượng' : 800 },

{ 'kiểu' : 'rau quả' , 'tên' : 'Quả xoài' , 'xác định_quốc gia' : 'NHẬT BẢN' , 'Số lượng' : 0 },

{ 'kiểu' : 'hoa quả' , 'tên' : 'chanh vàng' , 'xác định_quốc gia' : 'ẤN ĐỘ' , 'Số lượng' : 1700 },

{ 'kiểu' : 'rau quả' , 'tên' : 'cà chua' , 'xác định_quốc gia' : 'HOA KỲ' , 'Số lượng' : 1200 },

{ 'kiểu' : 'rau quả' , 'tên' : 'Quả xoài' , 'xác định_quốc gia' : 'NHẬT BẢN' , 'Số lượng' : 0 },

{ 'kiểu' : 'hoa quả' , 'tên' : 'chanh vàng' , 'xác định_quốc gia' : 'ẤN ĐỘ' , 'Số lượng' : 0 }

]

# tạo khung dữ liệu thị trường từ dữ liệu trên

market_df = linuxhint_spark_app.createDataFrame(rau)

market_df.show()

Đầu ra:

Ở đây, chúng tôi tạo DataFrame này với 4 cột và 8 hàng. Bây giờ, chúng tôi sử dụng pandas_udf() để tạo các hàm do người dùng xác định và áp dụng chúng cho các cột này.

Pandas_udf() với các loại dữ liệu khác nhau

Trong trường hợp này, chúng ta tạo một số hàm do người dùng định nghĩa với pandas_udf() và áp dụng chúng trên các cột và hiển thị kết quả bằng phương thức select(). Trong mỗi trường hợp, chúng tôi sử dụng pandas.Series khi chúng tôi thực hiện các hoạt động được vector hóa. Điều này coi các giá trị cột là một mảng một chiều và thao tác được áp dụng trên cột. Trong chính trình trang trí, chúng tôi chỉ định kiểu trả về của hàm.

Ví dụ 1: Pandas_udf() với Kiểu chuỗi

Ở đây, chúng tôi tạo hai hàm do người dùng xác định với kiểu trả về chuỗi để chuyển đổi các giá trị cột kiểu chuỗi thành chữ hoa và chữ thường. Cuối cùng, chúng tôi áp dụng các hàm này trên các cột “loại” và “xác định_quốc gia”.

# Chuyển đổi kiểu cột thành chữ hoa với pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

trả về i.str.upper()

# Chuyển đổi cột định vị_quốc gia thành chữ thường với pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

trả lại i.str.lower()

# Hiển thị các cột bằng select()

market_df.select( 'kiểu' ,type_upper_case( 'kiểu' ), 'xác định_quốc gia' ,
country_lower_case( 'xác định_quốc gia' )).trình diễn()

Đầu ra:

Giải trình:

Hàm StringType() có sẵn trong mô-đun pyspark.sql.types. Chúng tôi đã nhập mô-đun này trong khi tạo Khung dữ liệu PySpark.

  1. Đầu tiên, UDF (hàm do người dùng xác định) trả về các chuỗi ở dạng chữ hoa bằng cách sử dụng hàm str.upper(). Hàm str.upper() có sẵn trong Cấu trúc dữ liệu chuỗi (vì chúng ta đang chuyển đổi thành chuỗi bằng một mũi tên bên trong hàm) để chuyển đổi chuỗi đã cho thành chữ hoa. Cuối cùng, hàm này được áp dụng cho cột “type” được chỉ định bên trong phương thức select(). Trước đây, tất cả các chuỗi trong cột loại đều ở dạng chữ thường. Bây giờ, chúng được đổi thành chữ hoa.
  2. Thứ hai, UDF trả về các chuỗi ở dạng chữ hoa bằng cách sử dụng hàm str.lower(). Hàm str.lower() có sẵn trong Cấu trúc dữ liệu chuỗi sẽ chuyển đổi chuỗi đã cho thành chữ thường. Cuối cùng, hàm này được áp dụng cho cột “type” được chỉ định bên trong phương thức select(). Trước đây, tất cả các chuỗi trong cột loại đều được viết hoa. Bây giờ, chúng được đổi thành chữ thường.

Ví dụ 2: Pandas_udf() với Kiểu số nguyên

Hãy tạo một UDF để chuyển đổi cột số nguyên PySpark DataFrame thành chuỗi Pandas và thêm 100 vào mỗi giá trị. Truyền cột “số lượng” cho hàm này bên trong phương thức select().

# Thêm 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

trả lại tôi + 100

# Truyền cột số lượng cho hàm trên và hiển thị.

market_df.select( 'Số lượng' ,thêm_100( 'Số lượng' )).trình diễn()

Đầu ra:

Giải trình:

Bên trong UDF, chúng tôi lặp lại tất cả các giá trị và chuyển đổi chúng thành Sê-ri. Sau đó, chúng tôi thêm 100 vào mỗi giá trị trong Sê-ri. Cuối cùng, chúng ta chuyển cột “số lượng” cho hàm này và chúng ta có thể thấy rằng 100 được thêm vào tất cả các giá trị.

Pandas_udf() với các loại dữ liệu khác nhau sử dụng Groupby() & Agg()

Hãy xem các ví dụ để chuyển UDF tới các cột tổng hợp. Ở đây, các giá trị cột được nhóm trước tiên bằng hàm groupby() và việc tổng hợp được thực hiện bằng hàm agg(). Chúng tôi chuyển UDF của mình bên trong hàm tổng hợp này.

Cú pháp:

pyspark_dataframe_object.groupby( 'nhóm_cột' .agg(UDF
(pyspark_dataframe_object[ 'cột' ]))

Ở đây, các giá trị trong cột nhóm được nhóm đầu tiên. Sau đó, việc tổng hợp được thực hiện trên từng dữ liệu được nhóm đối với UDF của chúng tôi.

Ví dụ 1: Pandas_udf() với Aggregate Mean()

Ở đây, chúng ta tạo một hàm do người dùng định nghĩa với kiểu trả về float. Bên trong hàm, chúng ta tính trung bình bằng cách sử dụng hàm mean(). UDF này được chuyển đến cột “số lượng” để lấy số lượng trung bình cho mỗi loại.

# trả về giá trị trung bình/trung bình

@pandas_udf( 'trôi nổi' )

def average_function(i: panda.Series) -> float:

trả về i.mean()

# Truyền cột số lượng cho hàm bằng cách nhóm cột loại.

market_df.groupby( 'kiểu' .agg(average_function(market_df[ 'Số lượng' ])).trình diễn()

Đầu ra:

Chúng tôi đang nhóm dựa trên các yếu tố trong cột 'loại'. Hai nhóm được thành lập – “trái cây” và “rau củ”. Đối với mỗi nhóm, giá trị trung bình được tính toán và trả về.

Ví dụ 2: Pandas_udf() với Aggregate Max() và Min()

Ở đây, chúng tôi tạo hai hàm do người dùng định nghĩa với kiểu trả về số nguyên (int). UDF đầu tiên trả về giá trị nhỏ nhất và UDF thứ hai trả về giá trị lớn nhất.

# pandas_udf trả về giá trị nhỏ nhất

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

trả lại i.min()

# pandas_udf trả về giá trị lớn nhất

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

trả lại i.max()

# Chuyển cột số lượng tới min_ pandas_udf bằng cách nhóm lại vị trí_quốc gia.

market_df.groupby( 'xác định_quốc gia' .agg(min_(market_df[ 'Số lượng' ])).trình diễn()

# Chuyển cột số lượng tới max_ pandas_udf bằng cách nhóm định vị_quốc gia.

market_df.groupby( 'xác định_quốc gia' .agg(max_(market_df[ 'Số lượng' ])).trình diễn()

Đầu ra:

Để trả về các giá trị tối thiểu và tối đa, chúng tôi sử dụng các hàm min() và max() trong kiểu trả về của UDF. Bây giờ, chúng tôi nhóm dữ liệu trong cột “locate_country”. Bốn nhóm được thành lập (“CHINA”, “INDIA”, “JAPAN”, “USA”). Đối với mỗi nhóm, chúng tôi trả lại số lượng tối đa. Tương tự, chúng tôi trả lại số lượng tối thiểu.

Phần kết luận

Về cơ bản, pandas_udf() được sử dụng để thực hiện các hoạt động được vector hóa trên Khung dữ liệu PySpark của chúng tôi. Chúng ta đã thấy cách tạo pandas_udf() và áp dụng nó vào Khung dữ liệu PySpark. Để hiểu rõ hơn, chúng tôi đã thảo luận về các ví dụ khác nhau bằng cách xem xét tất cả các kiểu dữ liệu (chuỗi, số float và số nguyên). Có thể sử dụng pandas_udf() với groupby() thông qua hàm agg().