Cách triển khai truyền dữ liệu thời gian thực bằng Python

Cach Trien Khai Truyen Du Lieu Thoi Gian Thuc Bang Python



Nắm vững việc triển khai truyền dữ liệu theo thời gian thực bằng Python đóng vai trò là một kỹ năng thiết yếu trong thế giới liên quan đến dữ liệu ngày nay. Hướng dẫn này khám phá các bước cốt lõi và các công cụ cần thiết để sử dụng tính năng truyền dữ liệu theo thời gian thực với tính xác thực trong Python. Từ việc chọn một khung phù hợp như Apache Kafka hoặc Apache Pulsar đến viết mã Python để tiêu thụ, xử lý và trực quan hóa dữ liệu một cách dễ dàng, chúng ta sẽ có được các kỹ năng cần thiết để xây dựng các kênh dữ liệu thời gian thực linh hoạt và hiệu quả.

Ví dụ 1: Triển khai truyền dữ liệu thời gian thực bằng Python

Việc triển khai truyền phát dữ liệu theo thời gian thực bằng Python là rất quan trọng trong thời đại và thế giới dựa trên dữ liệu ngày nay. Trong ví dụ chi tiết này, chúng ta sẽ hướng dẫn quy trình xây dựng hệ thống truyền dữ liệu theo thời gian thực bằng cách sử dụng Apache Kafka và Python trong Google Colab.







Để khởi tạo ví dụ trước khi bắt đầu viết mã, việc xây dựng một môi trường cụ thể trong Google Colab là điều cần thiết. Việc đầu tiên chúng ta cần làm là cài đặt các thư viện cần thiết. Chúng tôi sử dụng thư viện “kafka-python” để tích hợp Kafka.



! pip cài đặt kafka-python


Lệnh này cài đặt thư viện “kafka-python” cung cấp các hàm Python và các liên kết cho Apache Kafka. Tiếp theo, chúng tôi nhập các thư viện cần thiết cho dự án của mình. Nhập các thư viện cần thiết bao gồm “KafkaProducer” và “KafkaConsumer” là các lớp từ thư viện “kafka-python” cho phép chúng ta tương tác với các nhà môi giới Kafka. JSON là thư viện Python để làm việc với dữ liệu JSON mà chúng tôi sử dụng để tuần tự hóa và giải tuần tự hóa các tin nhắn.



từ kafka nhập KafkaProducer, KafkaConsumer
nhập json


Tạo ra một nhà sản xuất Kafka





Điều này rất quan trọng vì nhà sản xuất Kafka sẽ gửi dữ liệu đến chủ đề Kafka. Trong ví dụ của chúng tôi, chúng tôi tạo một nhà sản xuất để gửi dữ liệu thời gian thực mô phỏng đến một chủ đề có tên là “chủ đề thời gian thực”.

Chúng tôi tạo một phiên bản “KafkaProducer” chỉ định địa chỉ của nhà môi giới Kafka là “localhost:9092”. Sau đó, chúng tôi sử dụng “value_serializer”, một hàm tuần tự hóa dữ liệu trước khi gửi nó tới Kafka. Trong trường hợp của chúng tôi, hàm lambda mã hóa dữ liệu dưới dạng JSON được mã hóa UTF-8. Bây giờ, hãy mô phỏng một số dữ liệu thời gian thực và gửi nó đến chủ đề Kafka.



nhà sản xuất = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( TRONG ) .encode ( 'utf-8' ) )
# Dữ liệu mô phỏng thời gian thực
dữ liệu = { 'cảm biến_id' : 1 , 'nhiệt độ' : 25,5 , 'độ ẩm' : 60,2 }
# Gửi dữ liệu đến chủ đề
nhà sản xuất.send ( 'chủ đề thời gian thực' , dữ liệu )


Trong những dòng này, chúng tôi xác định từ điển “dữ liệu” đại diện cho dữ liệu cảm biến mô phỏng. Sau đó, chúng tôi sử dụng phương thức “gửi” để xuất bản dữ liệu này lên “chủ đề thời gian thực”.

Sau đó, chúng tôi muốn tạo người tiêu dùng Kafka và người tiêu dùng Kafka đọc dữ liệu từ chủ đề Kafka. Chúng tôi tạo ra một người tiêu dùng để sử dụng và xử lý các tin nhắn trong “chủ đề thời gian thực”. Chúng tôi tạo một phiên bản “KafkaConsumer”, chỉ định chủ đề mà chúng tôi muốn sử dụng, ví dụ: (chủ đề thời gian thực) và địa chỉ của nhà môi giới Kafka. Sau đó, “value_deserializer” là một hàm giải tuần tự hóa dữ liệu nhận được từ Kafka. Trong trường hợp của chúng tôi, hàm lambda giải mã dữ liệu dưới dạng JSON được mã hóa UTF-8.

người tiêu dùng = KafkaConsumer ( 'chủ đề thời gian thực' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


Chúng tôi sử dụng vòng lặp để liên tục sử dụng và xử lý các thông báo từ chủ đề.

# Đọc và xử lý dữ liệu thời gian thực
tin nhắn TRONG người tiêu dùng:
dữ liệu = tin nhắn.value
in ( f 'Dữ liệu đã nhận: {data}' )


Chúng tôi truy xuất giá trị của từng thông báo và dữ liệu cảm biến được mô phỏng của chúng tôi bên trong vòng lặp và in nó ra bảng điều khiển. Việc chạy trình sản xuất và trình tiêu dùng Kafka liên quan đến việc chạy mã này trong Google Colab và thực thi các ô mã riêng lẻ. Nhà sản xuất gửi dữ liệu mô phỏng đến chủ đề Kafka và người tiêu dùng đọc và in dữ liệu nhận được.


Phân tích đầu ra khi mã chạy

Chúng tôi sẽ quan sát dữ liệu thời gian thực đang được sản xuất và tiêu thụ. Định dạng dữ liệu có thể khác nhau tùy thuộc vào nguồn dữ liệu mô phỏng hoặc thực tế của chúng tôi. Trong ví dụ chi tiết này, chúng tôi đề cập đến toàn bộ quá trình thiết lập hệ thống truyền dữ liệu theo thời gian thực bằng cách sử dụng Apache Kafka và Python trong Google Colab. Chúng tôi sẽ giải thích từng dòng mã và ý nghĩa của nó trong việc xây dựng hệ thống này. Truyền dữ liệu theo thời gian thực là một khả năng mạnh mẽ và ví dụ này đóng vai trò là nền tảng cho các ứng dụng thực tế phức tạp hơn.

Ví dụ 2: Triển khai truyền dữ liệu thời gian thực bằng Python bằng dữ liệu thị trường chứng khoán

Hãy thực hiện một ví dụ độc đáo khác về việc triển khai truyền dữ liệu theo thời gian thực trong Python bằng một kịch bản khác; lần này, chúng tôi sẽ tập trung vào dữ liệu thị trường chứng khoán. Chúng tôi tạo ra một hệ thống truyền dữ liệu theo thời gian thực để nắm bắt những thay đổi về giá cổ phiếu và xử lý chúng bằng cách sử dụng Apache Kafka và Python trong Google Colab. Như đã trình bày trong ví dụ trước, chúng tôi bắt đầu bằng cách định cấu hình môi trường của mình trong Google Colab. Đầu tiên chúng ta cài đặt các thư viện cần thiết:

! pip cài đặt kafka-python yfinance


Ở đây, chúng tôi thêm thư viện “yfinance” cho phép chúng tôi lấy dữ liệu thị trường chứng khoán theo thời gian thực. Tiếp theo, chúng tôi nhập các thư viện cần thiết. Chúng tôi tiếp tục sử dụng các lớp “KafkaProducer” và “KafkaConsumer” từ thư viện “kafka-python” để tương tác với Kafka. Chúng tôi nhập JSON để làm việc với dữ liệu JSON. Chúng tôi cũng sử dụng “yfinance” để lấy dữ liệu thị trường chứng khoán theo thời gian thực. Chúng tôi cũng nhập thư viện “thời gian” để thêm độ trễ thời gian nhằm mô phỏng các bản cập nhật theo thời gian thực.

từ kafka nhập KafkaProducer, KafkaConsumer
nhập json
nhập khẩu yfinance BẰNG yf
nhập khẩu thời gian


Bây giờ, chúng tôi tạo một nhà sản xuất Kafka cho dữ liệu chứng khoán. Nhà sản xuất Kafka của chúng tôi nhận được dữ liệu chứng khoán theo thời gian thực và gửi nó đến một chủ đề Kafka có tên là “giá cổ phiếu”.

nhà sản xuất = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( TRONG ) .encode ( 'utf-8' ) )

trong khi ĐÚNG VẬY:
chứng khoán = yf.Ticker ( 'AAPL' ) # Ví dụ: cổ phiếu Apple Inc.
stock_data = stock.history ( Giai đoạn = '1ngày' )
giá cuối cùng = dữ liệu chứng khoán [ 'Đóng' ] .iloc [ - 1 ]
dữ liệu = { 'biểu tượng' : 'AAPL' , 'giá' : giá cuối cùng }
nhà sản xuất.send ( 'giá cổ phiếu' , dữ liệu )
thời gian ngủ ( 10 ) # Mô phỏng cập nhật theo thời gian thực cứ sau 10 giây


Chúng tôi tạo một phiên bản “KafkaProducer” với địa chỉ của nhà môi giới Kafka trong mã này. Bên trong vòng lặp, chúng tôi sử dụng “yfinance” để nhận giá cổ phiếu mới nhất của Apple Inc. (“AAPL”). Sau đó, chúng tôi trích xuất giá đóng cửa cuối cùng và gửi đến chủ đề “giá cổ phiếu”. Cuối cùng, chúng tôi giới thiệu độ trễ thời gian để mô phỏng các cập nhật theo thời gian thực cứ sau 10 giây.

Hãy tạo một người tiêu dùng Kafka để đọc và xử lý dữ liệu giá cổ phiếu từ chủ đề “giá cổ phiếu”.

người tiêu dùng = KafkaConsumer ( 'giá cổ phiếu' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

tin nhắn TRONG người tiêu dùng:
stock_data = tin nhắn.value
in ( f 'Dữ liệu chứng khoán đã nhận: {stock_data['symbol']} - Giá: {stock_data['price']}' )


Mã này tương tự như thiết lập người tiêu dùng của ví dụ trước. Nó liên tục đọc và xử lý các tin nhắn từ chủ đề “giá cổ phiếu” và in ký hiệu và giá cổ phiếu ra bảng điều khiển. Chúng tôi thực thi các ô mã một cách tuần tự, ví dụ: từng ô một trong Google Colab để điều hành nhà sản xuất và người tiêu dùng. Nhà sản xuất nhận và gửi thông tin cập nhật giá cổ phiếu theo thời gian thực trong khi người tiêu dùng đọc và hiển thị dữ liệu này.

! pip cài đặt kafka-python yfinance
từ kafka nhập KafkaProducer, KafkaConsumer
nhập json
nhập khẩu yfinance BẰNG yf
nhập khẩu thời gian
nhà sản xuất = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( TRONG ) .encode ( 'utf-8' ) )

trong khi ĐÚNG VẬY:
chứng khoán = yf.Ticker ( 'AAPL' ) # cổ phiếu Apple Inc.
stock_data = stock.history ( Giai đoạn = '1ngày' )
giá cuối cùng = dữ liệu chứng khoán [ 'Đóng' ] .iloc [ - 1 ]

dữ liệu = { 'biểu tượng' : 'AAPL' , 'giá' : giá cuối cùng }

nhà sản xuất.send ( 'giá cổ phiếu' , dữ liệu )

thời gian ngủ ( 10 ) # Mô phỏng cập nhật theo thời gian thực cứ sau 10 giây
người tiêu dùng = KafkaConsumer ( 'giá cổ phiếu' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

tin nhắn TRONG người tiêu dùng:
stock_data = tin nhắn.value
in ( f 'Dữ liệu chứng khoán đã nhận: {stock_data['symbol']} - Giá: {stock_data['price']}' )


Trong phần phân tích đầu ra sau khi mã chạy, chúng ta sẽ quan sát cập nhật giá cổ phiếu theo thời gian thực của Apple Inc. đang được sản xuất và tiêu thụ.

Phần kết luận

Trong ví dụ độc đáo này, chúng tôi đã trình diễn cách triển khai truyền dữ liệu theo thời gian thực bằng Python bằng cách sử dụng Apache Kafka và thư viện “yfinance” để thu thập và xử lý dữ liệu thị trường chứng khoán. Chúng tôi đã giải thích kỹ lưỡng từng dòng mã. Truyền dữ liệu theo thời gian thực có thể được áp dụng cho nhiều lĩnh vực khác nhau để xây dựng các ứng dụng trong thế giới thực về tài chính, IoT, v.v.