יצירת צינור נתונים בסטרימינג באמצעות תבנית Dataflow

במדריך למתחילים הזה נסביר איך ליצור פייפליין סטרימינג באמצעות תבנית Dataflow שסופקה על ידי Google. במדריך למתחילים הזה נעשה שימוש בתבנית Pub/Sub to BigQuery כדוגמה.

התבנית Pub/Sub to BigQuery היא צינור להעברת נתונים בזמן אמת שיכול לקרוא הודעות בפורמט JSON מנושא ב-Pub/Sub ולכתוב אותן בטבלה ב-BigQuery.


לחצו על תראו לי איך כדי לקרוא הסבר מפורט על המשימה ישירות במסוף Google Cloud :

תראו לי איך


לפני שמתחילים

לפני שמריצים את צינור הנתונים, צריך לבצע את השלבים הבאים.

הגדרת הפרויקט

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

התפקידים הנדרשים

כדי להשלים את המדריך למתחילים הזה, אתם צריכים את התפקידים הבאים בניהול הזהויות והרשאות הגישה (IAM).

כדי לקבל את ההרשאות שדרושות לביצוע ההפעלה המהירה הזו, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בפרויקט:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

כדי לוודא שלחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine יש את ההרשאות הנדרשות להרצת משימת Dataflow, צריך לבקש מהאדמין להקצות לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine את תפקידי ה-IAM הבאים בפרויקט:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שהאדמין גם יוכל לתת לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine את ההרשאות שנדרשות באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

יצירת קטגוריה של Cloud Storage

כדי להריץ צינור, צריך קודם ליצור קטגוריה של Cloud Storage.

  1. יוצרים קטגוריה של Cloud Storage:

    1. במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

      כניסה לדף Buckets

    2. לוחצים על יצירה.
    3. ממלאים את פרטי הקטגוריה בדף Create a bucket. כדי לעבור לשלב הבא לוחצים על Continue.
      1. בשדה Name your bucket (שם הקטגוריה), מזינים שם ייחודי לקטגוריה. שם הקטגוריה לא יכול להכיל מידע רגיש, כי מרחב השמות של הקטגוריות זמין וגלוי לכולם.
      2. בקטע Choose where to store your data, מבצעים את הפעולות הבאות:
        1. בוחרים סוג מיקום.
        2. בתפריט הנפתח Location type, בוחרים מיקום שבו יישמרו נתוני הקטגוריה באופן קבוע.
        3. כדי להגדיר שכפול בין מאגרי מידע, בוחרים באפשרות הוספת שכפול בין מאגרי מידע באמצעות Storage Transfer Service ופועלים לפי השלבים הבאים:

          הגדרה של רפליקציה בין מאגרי מידע

          1. בתפריט Bucket, בוחרים באפשרות הרצויה.
          2. בקטע הגדרות השכפול, לוחצים על הגדרה כדי להגדיר את ההגדרות של משימת השכפול.

            מופיעה החלונית Configure cross-bucket replication.

            • כדי לסנן אובייקטים לשכפול לפי קידומת של שם האובייקט, מזינים קידומת שרוצים לכלול או להחריג אובייקטים ממנה, ואז לוחצים על הוספת קידומת.
            • כדי להגדיר סוג אחסון לאובייקטים המשוכפלים, בוחרים סוג אחסון בתפריט סוג אחסון. אם מדלגים על השלב הזה, האובייקטים המשוכפלים ישתמשו בסוג האחסון של קטגוריית היעד כברירת מחדל.
            • לוחצים על סיום.
      3. בקטע Choose how to store your data, מבצעים את הפעולות הבאות:
        1. בקטע Set a default class, בוחרים באפשרות הבאה: Standard.
        2. כדי להפעיל מרחב שמות היררכי, בקטע Optimize storage for data-intensive workloads, בוחרים באפשרות Enable hierarchical namespace on this bucket.
      4. בקטע Choose how to control access to objects, בוחרים אם הקטגוריה אוכפת public access prevention או לא, ואז בוחרים שיטת בקרת גישה לאובייקטים של הקטגוריה.
      5. בקטע Choose how to protect object data, מבצעים את הפעולות הבאות:
        • בוחרים באחת מהאפשרויות בקטע הגנה על נתונים שרוצים להגדיר לקטגוריה.
          • כדי להפעיל מחיקה עם יכולת שחזור, מסמנים את התיבה מדיניות מחיקה עם יכולת שחזור (לשחזור נתונים) ומציינים את מספר הימים שבהם רוצים לשמור אובייקטים אחרי המחיקה.
          • כדי להגדיר ניהול גרסאות של אובייקטים, מסמנים את התיבה ניהול גרסאות של אובייקטים (לשליטה בגרסאות) ומציינים את מספר הגרסאות המקסימלי לכל אובייקט ואת מספר הימים שאחריהם הגרסאות הלא עדכניות יפוגו.
          • כדי להפעיל את מדיניות שמירת הנתונים על אובייקטים וקטגוריות, לוחצים על תיבת הסימון שמירת נתונים (לצורך תאימות), ואז מבצעים את הפעולות הבאות:
            • כדי להפעיל את הנעילה של שמירת אובייקטים, מסמנים את התיבה הפעלת שמירת אובייקטים.
            • כדי להפעיל את נעילת הקטגוריה, מסמנים את תיבת הסימון הגדרת מדיניות שמירת נתונים בקטגוריה ובוחרים יחידת זמן ואת משך הזמן של תקופת השמירה.
        • כדי לבחור איך להצפין את נתוני האובייקט, מרחיבים את הקטע Data encryption () ובוחרים Data encryption method.
    4. לוחצים על יצירה.
  2. מעתיקים את הפרטים הבאים כי תצטרכו אותם בקטע הבא:

    • שם הקטגוריה של Cloud Storage.
    • מזהה הפרויקט Google Cloud .

    כדי למצוא את המזהה הזה, אפשר לעיין במאמר בנושא זיהוי פרויקטים.

רשת VPC

כברירת מחדל, כל פרויקט חדש מתחיל עם רשת ברירת מחדל. אם רשת ברירת המחדל של הפרויקט מושבתת או נמחקה, צריך שתהיה בפרויקט רשת שלחשבון המשתמש שלכם יש בה את התפקיד משתמש ברשת Compute (roles/compute.networkUser).

יצירת מערך נתונים וטבלה ב-BigQuery

יוצרים מערך נתונים וטבלה ב-BigQuery עם הסכימה המתאימה לנושא ב-Pub/Sub באמצעות Google Cloud המסוף.

בדוגמה הזו, שם קבוצת הנתונים הוא taxirides ושם הטבלה הוא realtime. כדי ליצור את מערך הנתונים והטבלה האלה, פועלים לפי השלבים הבאים:

  1. עוברים לדף BigQuery.
    כניסה ל-BigQuery
  2. בחלונית Explorer, ליד הפרויקט שבו רוצים ליצור את מערך הנתונים, לוחצים על View actions (הצגת פעולות) ואז על Create dataset (יצירת מערך נתונים).
  3. בחלונית Create dataset (יצירת מערך נתונים), פועלים לפי השלבים הבאים:
    1. בשדה Dataset ID (מזהה מערך הנתונים), מזינים taxirides. מזהי מערכי נתונים הם ייחודיים לכל Google Cloud פרויקט.
    2. בקטע Location type, בוחרים באפשרות Multi-region ואז באפשרות US (multiple regions in United States). מערכי נתונים ציבוריים מאוחסנים בUS מיקום רב-אזורי. כדי לפשט את התהליך, כדאי למקם את מערך הנתונים באותו מיקום.
    3. משאירים את שאר הגדרות ברירת המחדל ולוחצים על יצירת מערך נתונים.
  4. בחלונית Explorer, מרחיבים את הפרויקט.
  5. לצד קבוצת הנתונים taxirides, לוחצים על View actions ואז על Create table.
  6. בחלונית Create table (יצירת טבלה), פועלים לפי השלבים הבאים:
    1. בקטע מקור, באפשרות יצירת טבלה מ, בוחרים באפשרות טבלה ריקה.
    2. בקטע יעד, בשדה טבלה, מזינים realtime.
    3. בקטע Schema (סכימה), לוחצים על המתג Edit as text (עריכה כטקסט) ומדביקים את הגדרת הסכימה הבאה בתיבה:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. בקטע Partition and cluster settings, בשדה Partitioning, בוחרים את השדה timestamp.
  7. משאירים את שאר הגדרות ברירת המחדל ולוחצים על יצירת טבלה.

הרצת צינור עיבוד הנתונים

מריצים צינור סטרימינג באמצעות התבנית Pub/Sub to BigQuery שסופקה על ידי Google. הצינור מקבל נתונים נכנסים מנושא הקלט.

  1. עוברים לדף משימות ב-Dataflow.
    כניסה לדף Jobs
  2. לוחצים על Create job from template (יצירת משימה מתבנית).
  3. מזינים taxi-data בתור שם המשימה של משימת Dataflow.
  4. בקטע תבנית Dataflow, בוחרים בתבנית Pub/Sub to BigQuery.
  5. בקטע BigQuery output table (טבלת פלט של BigQuery), מזינים את הפרטים הבאים:
    PROJECT_ID:taxirides.realtime

    מחליפים את PROJECT_ID במזהה הפרויקט שבו יצרתם את מערך הנתונים ב-BigQuery.

  6. בקטע Optional source parameters, בשדה Input Pub/Sub topic, לוחצים על Enter topic manually.
  7. בתיבת הדו-שיח, בשדה Topic name, מזינים את השם הבא ולוחצים על Save:
    projects/pubsub-public-data/topics/taxirides-realtime

    נושא Pub/Sub שזמין לציבור הזה מבוסס על מערך הנתונים הפתוח של NYC Taxi & Limousine Commission. ההודעה הבאה היא דוגמה להודעה בנושא הזה, בפורמט JSON:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. בקטע Temp location, מזינים את הפרטים הבאים:
    gs://BUCKET_NAME/temp/

    מחליפים את BUCKET_NAME בשם הקטגוריה שלכם ב-Cloud Storage. התיקייה temp מאחסנת קבצים זמניים, כמו משימת צינור העיבוד שהועברה למצב מוכן.

  9. אם בפרויקט שלכם אין רשת שמוגדרת כברירת מחדל, צריך להזין רשת ורשת משנה. מידע נוסף זמין במאמר ציון רשת ורשת משנה.
  10. לוחצים על הפעלת העבודה.

צפייה בתוצאות

כדי לראות את הנתונים שנכתבו בטבלת realtime:

  1. עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. לוחצים על Compose a new query. תיפתח כרטיסייה חדשה של כלי העריכה.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    מחליפים את PROJECT_ID במזהה הפרויקט שבו יצרתם את מערך הנתונים ב-BigQuery. יכול להיות שיחלפו עד חמש דקות לפני שנתונים יתחילו להופיע בטבלה.

  3. לוחצים על Run.

    השאילתה מחזירה שורות שנוספו לטבלה ב-24 השעות האחרונות. אפשר גם להריץ שאילתות באמצעות SQL סטנדרטי.

הסרת המשאבים

כדי לא לצבור חיובים לחשבון Google Cloud על המשאבים שבהם השתמשתם בדף הזה, פועלים לפי השלבים הבאים:

מחיקת הפרויקט

הדרך הקלה ביותר לבטל את החיוב היא למחוק את Google Cloud הפרויקט שיצרתם בשביל המדריך למתחילים.

  1. במסוף Google Cloud , נכנסים לדף Manage resources.

    כניסה לדף Manage resources

  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבת הדו-שיח ולוחצים על Shut down.

מחיקת המשאבים הבודדים

אם אתם רוצים לשמור את הפרויקט שבו השתמשתם במדריך למתחילים הזה, אתם צריכים למחוק את המשאבים הבודדים: Google Cloud

  1. עוברים לדף משימות ב-Dataflow.
    כניסה לדף Jobs
  2. בוחרים את משימת הסטרימינג מרשימת המשימות.
  3. בניווט, לוחצים על הפסקה.
  4. בתיבת הדו-שיח Stop job, בוחרים באפשרות cancel או drain של צינור הנתונים ולוחצים על Stop job.
  5. עוברים לדף BigQuery.
    כניסה ל-BigQuery
  6. בחלונית Explorer מרחיבים את הפרויקט.
  7. לצד מערך הנתונים שרוצים למחוק, לוחצים על הצגת פעולות ואז על פתיחה.
  8. בחלונית הפרטים, לוחצים על מחיקת מערך הנתונים ופועלים לפי ההוראות.
  9. במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

    כניסה לדף Buckets

  10. לוחצים על תיבת הסימון של הקטגוריה שרוצים למחוק.
  11. כדי למחוק את הקטגוריה, לוחצים על Delete ופועלים לפי ההוראות.

המאמרים הבאים