2015-07-25

ארכיטקטורה של Hyper-Scaling

נושא ה Scalability הוא פופולרי-במיוחד בכמה השנים האחרונות.

לאחרונה עברתי על עשרות קורות חיים של מועמדים - ובכמעט כולם צוינו מושגים כמו "High Scalability", או "Big Data", "NoSQL", "Hadoop" - וכו'. כנראה שכל מי שעבד במערכת עם הרבה transcriptions per seconds או נפחים גדולים של נתונים - סיפר על כך בהרחבה, ומי שלא - התאמץ להראות איזו זיקה. זה "המשחק" היום, ונראה לי שהייתי עושה בעצמי את אותו הדבר בדיוק!

בפוסט הזה אני רוצה לספר על תהליך של Hyper-Scaling שאנו עוברים בחברת Gett - וכיצד הוא משפיע על עבודת הארכיטקטורה.


No Scale

אני רוצה להזכיר שהמונח "Scalability", מתייחס בהנדסת תוכנה לשני סוגים של אתגרים:
  • Software Scalability - התמודדות עם יותר משתמשים, יותר פעילות, יותר נתונים.
  • Development Scalability - היכולת להתנהל עם צוות פיתוח גדול יותר.

ב Gett יש לנו Software Scale מסוים, שהוא לא קטן - אבל גם לא ענק. ככה וככה נתונים, ככה וככה פעולות בשנייה.
ההתמודדות העכשווית שלנו היא דווקא יותר עם Development Scalability, שכמו שאנסה להראות במהלך הפוסט - יש לה דמיון לא-קטן ל Software Scalability.

לפני כחצי שנה, כשהגעתי ל Gett היו בצוות צד-השרת כשישה מתכנתים. הגעתי מעט לאחר גיוס ענק של 150M$ שהחברה ביצעה. עם הגיוס, החברה החליטה להגדיל משמעותית את קבוצת ה R&D - בכדי לקבל משמעותית יותר תפוקה. בעת כתיבת הפוסט יש לנו כבר עשרים וחמישה (!!!) מתכנתי צד-השרת - ואנחנו עוד מגייסים.

את הכלל של "לא להגדיל גוף פיתוח ביותר מ 50% בשנה" - שברנו כבר מזמן... מה עושים עכשיו? ואיך מתמודדים עם זה מצד הארכיטקטורה?


Scale


ההקבלה בין Scale של תוכנה ו Scale של קבוצות-פיתוח


נוסחה מקובלת בעולם ה Software Scale היא זו:



אנו מגיעים ל Scale כאשר יש לנו כמות משאבים (למשל: שרתים) מסוימת, וכל שרת מבצע עבודה בקצב מסוים.
גדילה ב Scale, כלומר: Scaling - מתבצעת ע"י הוספת שרתים או לחלופין ע"י שיפור הביצועים של כל מחשב בודד במערכת.


ככל שהמערכת גדלה - סביר שנחווה מצב בו כל מחשב נוסף שאנו מוסיפים הוא פחות יעיל מקודמו. מדוע? מכיוון ש:
  • פעולות על כמות גדולה יותר של נתונים - אורכות יותר זמן. למשל: אינדקסים בבסיס נתונים רלציוני, הפחתת הרציפות בדיסק, caches פחות יעילים או סתם פעולות joins גדולות יותר (merge על work set גדול יותר).
  • יותר תקשורת שנדרשת בין המחשבים השונים במערכת. יש הבדל בין הודעות עדכון שנשלחות ל 6 מחשבים - וכאלו שנשלחות ל 25 מחשבים. פעם נתקלתי במערכת שהוספה של מחשבים למערכת, מעל 16 מחשבים, כבר לא הגדילה את ה scale - בגלל ריבוי של הודעות כאלו.
  • כמות הקוד שלא ניתן למקבל (parallelism) באופן טבעי תגדל, ולא תקטן - אלא אם הייתה השקעה משמעותית בצמצום קוד שכזה.
  • חוסרי יעילות שונים - שצצים במערכת באקראיות טבעית.

באופן דומה, גם בגדילה של גוף פיתוח - המפתח האחרון שנוסף נוטה להיות (בממוצע!) פחות יעיל מקודמו:
  • כל עבודה רוחבית במערכת (למשל: Refracting גדול), הופכים להיות קשים וארוכים פי כמה - כאשר כמות הקוד גדולה יותר.
  • יותר תקשורת וסנכרון נדרשת בין המפתחים בקבוצה. אם פעם היה מספיק להרים את הראש מבהייה במסך - בכדי ליצור קשר עם מתכנת שמכיר היטב היבט מסוים של המערכת, היום כבר צריך לקום מהמקום, לחפש - ולעתים לגלות שצריך לדבר עם כמה אנשים בכדי לקבל תמונה מלאה.
  • תמונות-העולם של המפתחים בארגון מתבזרות במהירות: בניגוד לצוות שהיה לו זמן להתגבש במשך תקופה ארוכה - כעת יש זרימה של אנשים חדשים, שכל אחד רגיל לשיטות שונות וגישות שונות.
    הגישות הללו, עבור כל אחד, "הוכיחו את עצמן מעל ספק סביר - בעבר". אמת. אבל מה עושים כאשר הגישות הפוכות זו לזו? האם ORM הוא טוב להכל, טוב רק לקונפיגורציה, או "רעה-חולה שיש להסיר מהמערכת בהקדם האפשרי!"?
  • יותר ידיים עובדות => יותר קוד => מערכת מורכבת יותר. כל שינוי במערכת מורכבת יותר - אורך יותר זמן בעצמו (מגמה לחוסר יעילות מובנה).
  • נוצרים יותר צווארי בקבוק ("רק משה מכיר את הקוד הזה") - שהולכים ומקשים יותר ויותר על התקדמות הפיתוח.
  • יותר ישיבות, יותר המוניות, יותר אנשים שיש להכיר ולהתרגל לעבוד איתם - חוסרי יעילות שונים, שצצים במערכת באקראיות כל-כך טבעית.
ההשקעה ב Development Scale בפיתוח אמנם עוסקת במידה רבה בגיוס עובדים ("Capacity"), אבל לא פחות מכך - בשיפור היעילות של כל עובד ("Performance"). תהליכי ה Continuous Integration (ליתר דיוק: on-going integration) - מוגדרים מחדש, אנו משקיעים יותר בשיתוף הידע - ופישוט שלו, וכאן יש לצוות הארכיטקטים תפקיד חשוב.

אנו חותרים ל Continuous Delivery (הפעם: באמת) - בכדי לשפר את יכולת התגובה לתקלות במערכת, וכדי לעשות אותה יציבה יותר. באופן פרדוקסלי משהו, הניסיון בתעשייה מראה שדווקא העלאת תכיפות ה deployments מגדיל את יציבות המערכת - בטווח הבינוני-ארוך. יותר deploys = יותר "שבירות", אבל אז גם יותר לקחים, יותר מנגנוני-התאוששות ובקרה, ויותר אוטומציה. כל עוד אין מנגנון ארגוני שמותיר "לעצור את הקצב, ולצמצם את קצב ה deploys" - האנרגיות ינותבו לשיפור המערכת, ומנגנוני הייצוב שלה.


--

ב Software Scale, יש את השאיפה התמידית ל Linear Scalability: האידאל שלפיו הוספה של כל מכונה למערכת, תתרום את החלק היחסי שלה. למשל: הכפלת כמות השרתים - תכפיל את הספק העבודה (למשל: כמות בקשות בשנייה).

לא ממש Linear Scaling: ככל שמספר הבקשות עולה - יש להוסיף חלק יחסי גדול יותר של שרתים בכדי לענות על הביקוש.
יש במערכת הזו צווארי בקבוק מסוימים ל scalability.

בקבוצת ה R&D כולנו מבינים שככל שהמערכת גדלה - היעילות של המתכנים הולכת וקטנה. אין לנו שאיפות ל Linear Development Scalability. אנחנו גם מכירים במגבלות המקבילות האנושית ("תשע נשים לא יכולות ללדת ילד בחודש אחד").

בשונה מאיתנו ל Business דווקא יש ציפיות ל Linear Scalability - מפורשות יותר או פחות.
"פי-2 אנשי support עונים לפי-1.9 קריאות במוקד?" - הם מספרים, "כן... אנחנו מבינים שהנדסה זה קצת יותר מורכב. הגדלנו את הפיתוח פי 4, ואי-אפשר לקבל פי-4 פיצ'רים - אבל גם אי אפשר כבר לצפות לפחות לפי-3 יותר פיצ'רים, או לקבל אותם לפחות פי-3 יותר מהר?"

הלחץ מצד הביזנס הוא אולי משני, אבל הוא משפיע - וגורם לנו להתייעל יותר בכל הנוגע ל Development Scalability של הפיתוח. בעיקר ע"י צמצום חוסר-היעילות שהמערכת יוצרת למפתח הבודד.


--

ב Software Scale, יש "קסם" שיכול לסייע למערכת לצמוח ב Scale שהוא יותר מלינארי: מצב בו פי-2 שרתים, משרתים יותר מפי-2 משתמשים. כיצד זה קורה? יש כמה דרכים, אבל ה"קסם" הנפוץ ביותר הוא Cache (או memoization - בגרסה התאורטית שלו).
כאשר אנו יכולים לבצע חישוב מורכב רק פעם ב 5 דקות, ואז להפיץ את התוצאות לעוד ועוד משתמשים - כמות גדולה אפילו יותר של משתמשים תגדיל את הלחץ רק על ערוץ ההפצה (CDN?) - ולא על יצירת התוכן (החישוב).

ככל שנתכנן את המערכת שלנו בצורה בה ניתן יהיה להשתמש יותר ויותר ב Caches שכאלו - נשפר את ה Scalability של המערכת. תכנון שכזה כולל, הרבה פעמים - משא ומתן עם אנשי הביזנס ("תקבלו את זה מעודכן פעם בשעה - לא כל הזמן").

(פתרונות אחרים כוללים העברת עבודה ל Clients, או צמצום העבודה לקצב קטן מבו עולה ה scale, למשל: ביצוע חישוב על 500 משתמשי-מדגם, ללא קשר למספר כלל-המשתמשים במערכת).


Scaling שהוא טוב מ Linear-Scaling: הוספת שרת למערכת - מוסיפה יכולת לספק קצת יותר משתמשים מחלקו במערכת.

ב Development Scaling יש גם כמה "קסמים" שכאלו. הבולט בהם - הוא code re-usability: היכולת להשתמש בקוד שנכתב בעבר - עבור פיצ'ר חדש.

פתרון שונה-דומה הוא Generalization: כתיבת קוד כללי יותר - שיכול לשרת מטרות דומות, אך שונות.

הקסמים האלו - הם חמקמקים ביותר!
הם באמת לפעמים "עושים את הקסם" - אבל פעמים אחרות "יוצא מהם כל האוויר" ברגע האמת: אנו משקיעים עוד זמן ועבודה בקוד כללי יותר / קוד המתוכנן לשימוש חוזר - אבל אז השימוש החוזר פשוט לא מתאים. אם השקענו בקוד כללי - ואין לו שימוש, אנו נשארים עם קוד יקר יותר, מסובך יותר, ועם הצורך עדיין לספק פתרון לפיצ'ר השני (או, חס וחלילה: לאנוס את הפיצ'ר השני להיות משהו אחר - בכדי להתאים לקוד שכבר קיים).

מהנדסים צעירים, ואולי אף מהנדסים בכלל - נוטים לבצע הערכת יתר (גסה?) ליכולת שלהם לייצר קוד יעיל לשימוש חוזר / קוד כללי יעיל. משם נוצר הכלל You Ain't Gonna Need It (בקיצור: YAGNI) המציע פשוט לא לנסות לחזות מקרים אלו מראש, אלא לעשות Refactoring לקוד כללי רק ברגע שהוכח הצורך - מעל ספק סביר.

בכל מקרה: שימוש חוזר בקוד והכללה, גם אם נעשים בדיעבד - הם כלים חשובים מאוד לשיפור ה Development Scalability.




אז מה עם ארכיטקטורה ל Hyper-Scaling?!


אולי אתם מאוכזבים מעט מהפוסט: יש בו הרבה דיבורים כללים, ואין בו Hadoop, HPC או Big Data משום צורה!

אני אנסה לתמצת:
הפיתוח של Gett עובר כרגע תהליך של Development Hyper-Scaling. יש גם בעיות של Software-Scaling - אבל הן (עדיין) פחות מאתגרות - אולי אזכיר לקחים משם בפוסט אחר.

הארכיטקטורה, או תוכנית-העל שלנו להתמודד עם בעיות ה Development Hyper Scaling הן כאלו:
  • בראש ובראשונה - מעבר ל Micro-Services: הפיכת מערכת אחת מורכבת - לכמה מערכות קטנות יותר, ומורכבות פחות. המעבר הוא אינטנסיבי, אבל הוא מאפשר להבין ביתר קלות את כלל המערכת - ולדעת להיכן לצלול בעת הצורך. כמו כן - הוא מצמצם במידה רבה את הצורך בתקשורת מרובה, לטובת תקשורת בסיסית יותר וממוקדת יותר, למשל: סך האחריויות של כל שירות, וה APIs שלו - שמוגדרים היטב (אנחנו משתמשים ב Swagger לתיעוד - כלי שמשרת אותנו היטב).
    את השימוש ב MSA להתמודדות עם Development Hyper-Scaling לא המצאנו בעצמנו: למדנו מ case-studies על חברות שעמדו באתגר דומה (למשל: אמזון).
    • שימוש-חוזר בקוד, הוא רעיון שקשה לממש (מעבר לפונקציה פה ושם). דווקא Micro-services, בכך שאנו מגדירים שירותים עם שימוש עסקי ברור, ו APIs מוגדרים היטב - מסייעים לנו ליצור יחידות גדולות של קוד שמתאימות לשימוש-חוזר. כבר בחצי-שנה האחרונה, היו לנו כמה הצלחות יפות.
  • אנו עוסקים בצורה פרואקטיבית בארגון בשיתוף ידע על חלקי המערכת השונים, האחריויות שלהם, וה flows העיקריים במערכת. לא עובר כמעט שבוע שאני לא עושה session שכזה, לצוות כלשהו בפיתוח - ואני לא היחידי. עוד פעם ועוד פעם - עד שלכולם כבר יימאס (אנחנו עוד רחוקים משם...).
    שמות פשוטים, מטפורות טובות, וסיפורים קליטים - הם מרכיב עקרי בבניית והפצת הידע.
  • צוות הארכיטקטים לוקח תפקיד קצת יותר ריכוזי מהרגיל (אולי: יותר מהאידאל האישי שלי?!) בהגדרת superflows חדשים במערכת. כן! אנחנו רוצים לעבוד יותר agile ולתת לאנשים יותר ויותר אחריות והשפעה, אבל בנקודת הזמן הזו - תוצאות טובות יותר מושגות כאשר לפחות את ה flows העיקרים - מוגדרים מרכזית ע"י הארכיטקטים.
    כאשר מפתחים עושים שינויים ושיפורים ב flows - זו סיבה לשמחה (אלא אם בכך הם סותרים עקרונות או flows אחרים במערכת).
  • אנו מנסים לקדם בקוד כמה עקרונות:
    • קידום תרבות של הצגת פתרונות - ולא רק בעיות (בכל ה R&D).
    • קוד פשוט להבנה - עדיף יותר על פני קוד קצר או מתוחכם. אם אתם קוראים של הבלוג זמן רב, אתם אולי יודעים שזו הנטייה הטבעית שלי - אבל זה לא הסטנדרט הברור של ריילס (שם עקרונות של קוד קצר ו DRY - מושרשים עמוק בקהילה).
    • יותר כלי monitoring ו supportability עבור הפיתוח. בכתיבה של כל פיצ'ר - לחשוב איזו השקעה תהיה משתלמת כאשר לפיצ'ר הזה תהיה בעיה ב production. כלי supportability יכולים להציג חווית שימוש עלובה למדי - כל עוד הם עוזרים.
    • הכנסה של אוטומציה / בדיקות יחידה / בדיקות-API / בדיקות-אינטגרציה. בכל ארגון שראיתי בעשור האחרון זו הייתה המגמה - אבל אנחנו עכשיו צריכים את זה יותר.

אני מודע לכך שעדיין אין פה Design Patterns הנדסיים משמעותיים (אולי מלבד MSA) - אבל זה מה שעובד, ואנו עושים את מה שעובד - ולא רק מה שמתאים לציפיה מסוימת (ארכיטקטורה = "תרשימים של ריבועים"). סורי! :)


זהו... מקווה שנהניתם, ואולי אף השכלתם. כרגיל - אשמח לתגובות.


שיהיה לנו בהצלחה!




2015-07-12

מערכות מבוזרות - מבט עדכני

בפוסט הקודם, "בעיות של מערכות מבו..זרות", טענתי בטרוניה שבחומר הכתוב על מערכות מבוזרות - כמעט ולא מתארים מערכות מבוזרות בנות-זמננו.

קיטרתי - גם אני לא עשיתי זאת בעצמי (כמעט).

בפוסט זה ארצה לגעת בכמה מערכות מבוזרות - המאפשרות לפתור בעיות מבוזרות מבלי להיכנס לפרטים של "בחירת מנהיג" או "יצירת קונצנזוס". בעיות רבות של עיבוד מבוזר - חוזרות על עצמן, וחבל להתחיל לפתור אותן מהתחלה.

ישנם שני סוגים של מערכות מבוזרות נפוצות שאני יכול לחשוב עליהן:
  • מוצרים: אפליקציית לקוח של טורנט, Tor (רשת פרוקסי לאנונימיות), או משחק Massively Multi-player Online.
  • Frameworks: המאפשרים פתרון בעיות מבוזרות ב"רמת הפשטה גבוהה".
יש משהו מאוד קוסם בסקירה כיצד מוצרים מבוזרים עובדים - זה יכול להיות ממש מעניין!
האמת היא שיש לי (ובכלל בקהילה) - ידע מוגבל על הנושא. (הנה פוסט נחמד על Tor)

בסקירה של קטגורית Frameworks יש משהו יותר מעשי: סביר יותר שתבחרו Framework לעבוד בו - מאשר לכתוב מערת מבוזרת מ scratch, תוך כדי שאתם מושפעים ממימוש של מערכות אחרות.
כמו כן - הידע עליהן זמין ונגיש. לאחרונה התחלנו לעבוד עם Hadoop - מה שעוזרת לי לשלב את תוכן הפוסט עם העבודה השוטפת.

בסופו של דבר, גם ה Frameworks וגם המוצרים המבוזרים שמתוארים בפוסט עושים שימוש נרחב במנגנונים ("פרימיטיביים") של מערכות מבוזרות שתיארתי בפוסט הקודם: Multi-cast ורפליקציה, בחירת מנהיג, השגת קונצנזוס ועוד. חלק מכובד מהמערכות שאתאר בפוסט זה משתמש ב Zookeeper, מוצר שהזכרתי בפוסט הקודם - בכדי לבצע את הפרימיטיביים המבוזרים הללו.


תוך כדי כתיבת הפוסט - נוכחתי שאני מתמקד בקטגוריה מאוד ספציפית של מוצרים מבוזרים: מוצרי Big Data.

זה בסדר.

מוצרי Big Data לרוב מחשבים... הרבה נתונים. כ"כ הרבה נתונים שלא ניתן לאחסן במחשב אחד. לרוב גם לא בעשרה.
כדי לטפל בכמויות הנתונים (Volume) או במהירות שלהם (Velocity) - זקוקים למערכת מבוזרת. אותם מוצרים (או Frameworks) הם גם במובן אחד "מערכת מבוזרת", וגם "מוצר Big Data". בפוסט - נבחן בעיקר את הפן המבוזר שלהן.

בכל מקרה: כנראה שחלק מכובד מאוד מאנשי התוכנה שמתמודדים עם מערכות מבוזרות - עושים זאת דרך עבודה עם מוצרי Big Data, הפופולריים כ"כ היום.


בקיצור: זהו איננו פוסט על Big Data. יום אחד אולי אקדיש לנושא פוסט ראוי, ואולי גם לא :)








חישוב מבוזר


Scatter - Gather הוא השם המקובל לדפוס חישוב מבוזר, המבוסס על 2 רעיונות:
  • חישוב הוא מהיר יותר על מערכת מקומית (גישה לזיכרון מהירה מגישה ברשת, גישה לדיסק מקומי לרוב תהיה מהירה מגישה לדיסק מרוחק) - ולכן נבצע את החישוב היכן שנמצאים הנתונים.
  • למחשב יחיד אין דיי שטח אכסון לכל הנתונים של "חישוב גדול", וגם אם היה לו - לא היה לו את כח החישוב המספיק לעבד את כל הנתונים הללו בזמן סביר.
נ.ב. "זמן סביר" הוא יחסי. לא נדיר להיתקל בשאילתות שרצות על מערכות Hadoop במשך שבועות - עד לסיום החישוב.



Map Reduce - הוא היישום הידוע של פרדיגמת Scatter-Gather, שנחשב היום אמנם כבוגר - ומובן היטב.

הרעיון הוא לחלק את החישוב המבוזר לשני שלבים:
  • Map - עיבוד נתונים נקודתי על המכונה הלוקאלית (חיפוש / פילטור / עיבוד נתונים וכו')
  • Reduce - ביצוע סיכום של החישוב וחיבור התשובות של המחשבים הבודדים לתשובה אחת גדולה.
החלוצים בתחום של Map Reduce מבוזר על פני מספר רב של מחשבים הם חברת גוגל, ופרויקט Apache Hadoop - שהושפע רבות ממאמרים שגוגל פרסמה על מימוש ה Mapreduce שלה. בעשור מאז הושק הפך פרויקט Hadoop לסטנדט בתעשייה, עם קהילה עשירה ורחבה. כאשר הרווחים מתמיכה ב Hadoop הגיעה למאות מיליוני דולרים בשנה - נכנסו לשם גם שחקני ה Enterprise הגדולים (כלומר: EMC, IBM, Oracle, וכו')
.
Google Map-reduce ו Hadoop מבוססים על מערכת קבצים מבוזרת (GFS ו HDFS בהתאמה), ותשתית לביצוע חישוב מבוזר על גבי ה nodes שמחזיקים את הנתונים הנ"ל (ב Hadoop נקראת YARN, בגוגל כנראה פשוט "Mapreduce").

בפועל יש ב Map reduce שלושה שלבים עיקריים: Reduce, Map ו Shuffle (העברת סיכומי הביניים בין ה Mappers ל Reducers - פעולה אותו מספק ה Framework).

דוגמה לבעיית מיון מבוזרת (הניחו שמדובר במיליארדי רשומות) הנפתרת בעזרת MapReduce.
המפתחים כותבים את פונקציות ה Map וה Reduce, ה framework מספק להם את ה Shuffle (העברת הנתונים בין ה mappers ל reducers).

ניתן לכתוב פונקציות "Map" ו "Reduce" בקוד, אך סביר יותר להניח שכאשר אתם מגדירים חישובי Map-reduce רבים - תעדיפו להשתמש ברמת הפשטה גבוהה יותר כגון אלו שמספקים Pig (מן "שפת תכנות" מיוחדת ל Map-Reduce) או Hive (המבוסס על SQL).

כדרך אגב: Pig קיבל את שמו מכיוון שחזירים אוכלים גם צמחים וגם בשר - ו Pig יודע לטפל גם ב structured data וגם ב unstructured data.



השנים עברו, וכיום המודל של חלוקת החישוב המבוזר לשלבי Map ו Reduce  - נחשב מעט מוגבל.

כמו כן, ההסתמכות על מערכת קבצים כבסיס לחישוב היא טובה ל DataSets ענק שלא ניתן להכיל בזיכרון (יש clusters של Hadoop שמנתחים עשרות, ואולי מאות PetaBytes של נתונים), אבל בעשור האחרון הזיכרון הפך לזול (מכונה עם 1TB זכרון היא בהישג יד) - והיכולת לבצע את החישוב בזיכרון יכולה להאיץ את מהירות החישוב בסדר גודל או שניים, ואולי אף יותר (תלוי בחישוב). עובדה זו מקדמת את הפופולריות של הפתרונות לחישוב מבוזר מבוסס-הזיכרון (כמו Storm, או Spark), ומשאירה ל Hadoop את היתרון בעיקר עבור מאגרי הנתונים הבאמת-גדולים (נאמר: 100TB ויותר של נתונים, מספר שבוודאי ישתנה עם הזמן).


המוצרים העיקריים ב Eco-System של Hadoop


דוגמה נוספת לדפוס של Scatter - Gather הוא הדפוס בו עושים שימוש ב Spark. אני לא בטוח שזהו שם רשמי - אך אני מכיר את המינוח "Transform - Action" (כאשר "map" הוא פעולת transform אפשרית, ו "reduce" הוא action אפשרי).

Spark הוא הכוכב העולה של עולם ה Big Data. בניגוד ל Hadoop הוא מאפשר (כחלק מהמוצר) מספר רב יותר של פונקציות מאשר "map" ו "reduce".
במקום פעולת ה map ניתן להשתמש בסמנטיקות כגון map, filter, sample, union, join, partitionby ועוד...
במקום פעולת ה reduce ניתן להשתמש בסמנטיקות כגון reduce, collect, count, first, foreach, saveAsXxx ועוד.

מעבר לגמישות הזו - לא חייבים לתאר את הבעיה כ-2 שלבי חישוב, אלא ניתן להגדיר גרף חישוב גמיש כמעט לחלוטין.
עניין זה חשוב - כי יש בעיות חישוביות שפשוט לא מתמפות יפה ל 2 השלבים הברורים האלו. למשל: כאשר יש צורך להשתמש באותם נתוני-מקור כמה פעמים במהלך תהליך החישוב.

המוצרים ב Eco-System ה מתהווה של Spark (חלקם מקבילים לכלים קיימים של Hadoop). מקור: databricks

ל Spark יש עוד שתי תכונות חשובות:
  • הוא מחזיק (סוג של Cache) את הנתונים בזיכרון - אליו הגישה מהירה בסדרי גודל מאשר בדיסק.
  • הוא מרכז את רוב שרשרת החישוב באותו node ומצמצם את הצורך להעביר נתונים בין nodes ב cluster (גם ל Hadoop יש כלים שעוזרים לצמצם העברות של נתונים בין nodes, אך Map-reduce פחות מוצלח בכך באופן טבעי)
פרטים נוספים:
  • ה "agent" של Spark רץ במקביל למערכת ה storage המבוזר, תהיה זו Hadoop או Cassandra (או סתם קבצים בפורמט נתמך) - ושואב ממנו את הנתונים הגולמיים לחישוב.
  • ה Agent (נקרא node - אני נמנע משם זה בכדי למנוע בלבול) מקבל Job (לצורך העניין: קטע קוד) לבצע חישוב מסוים על הנתונים. יש כבר ב Agent קוד של פונקציות שימושיות (filter, sort, join, וכו' - לפעולות "map" או count, reduce, collect - לפעולות "reduce").
  • ה Agent מחלק את המשימה ל stages ובונה גרף חישוב (על בסיס הידע כיצד מאורגנים הנתונים ב cluster) - ושולח tasks חישוביים ל Agents ב cluster.

ל Spark יש לא מעט נקודות אינטגרציה עם Hadoop - כך שהתחרות באמת היא לא בין Hadoop ל Spark (לפחות - עדיין), אלא יותר תחרות בין Spark ל YARN (מנוע ה Map-reduce).




Event Processing


Spark ו Hadoop הם מנועים לחישוב אצוות (batch): יש טריגר מסוים שמתחיל את החישוב - והחישוב מתבצע לאורך זמן מסוים (עדיף שיהיה קצר ככל האפשר, אך הוא לא מוגבל בזמן) - עד שמתקבלת התוצאה.

זן אחר של חישוב הוא חישוב ב Streaming: ביצוע חישובים קטנים רבים, כחלק משטף של אירועים שנכנס למערכת.
בקטגוריה זו ניתן לכלול את Spark Streaming (המבוסס על Spark), אך אתמקד דווקא ב Storm - שנבנה לצורך זה במקור.


Storm

  • מערכת Horizontal scalable ו fault-tolerant - כמובן!
  • מערכת המספקת תשובות ב low-latency (שלא לומר real-time - כי זה פשוט לא יהיה נכון), תוך כדי שימוש בכוח חישוב מבוזר.
  • At-lest-once Processing מה שאומר שהנחת היסוד במערכת המבוזרת היא שחלקים שלה כושלים מדי פעם, ולכן לעתים נעשה אותו חישוב יותר מפעם אחת (לרוב: פעמיים) - בכדי להמשיך לקבל תוצאות גם במקרה של כשל מקומי.

Storm נולדה מתוך פרויקט בטוויטר לחישוב trending analysis ב low-latency.

המבנה של Storm Computation Cluster הוא סוג של Pipes & Filters, כאשר יש:
  • Stream - רצף של tuples של נתונים.
  • Spout - נקודת אינטגרציה של המערכת, המזרימה נתונים ל streams.
  • Bolt - נקודת חישוב ("Filter") בגרף החישוב ("הטופולוגיה").
    על מערכת Storm מריצים טופולוגיות חישוב רבות במקביל - כל אחת מבצעת חישוב אחר.
  • Supervisor - הוא node של Storm, שכולל כמה Workers שיכולים להריץ את קוד ה bolts או ה spouts.
  • Stream Grouping - האסטרטגיה כיצד לחלק עבודה בין bolts שונים.
בעוד פעולות אצווה סורקות את הנתונים מקצה אחד לקצה שני - וחוזרות עם תשובה ("42"), ב Event Processing כל הזמן נכנסים נתונים, והערך אותו אנו מחשבים (אחד או יותר) - יתעדכן באופן תדיר ("43, לא רגע... עכשיו 47... ועכשיו 40...").

טופולוגיה לדוגמה של Storm. מקור: michael-noll.com

אם העבודה היא חישובית בלבד - כל אחד מה supervisors יוכל להריץ את כל הטופולוגיה (ואז לא צריך להעביר נתונים בין מחשבים במערכת - שזה עדיף). נוכל לבחור, למשל, ב Local Stream Grouping - שמורה ל Storm לשלוח את הנתונים ל Worker על אותה המכונה.

במידה ויש נתונים מסוימים שיש להשתמש בהם לצורך החישוב, והם נמצאים על supervisors מסוימים - יש להגדיר את Stream Grouping כך שיידע לשלוח את הנתונים ל supervisor הנכון. במקרה כזה אולי נרצה לבחור ב Fields Stream Grouping - המורה ל Storm לשלוח את הנתונים למחשב ע"פ שדה מסוים בנתונים (למשל: "ארץ" - כי כל supervisor מחזיק נתונים של ארץ אחרת).


הערכה גסה איזה כלי יכול להתאים לכם - בהינתן בזמני ההמתנה לתשובה שאתם מוכנים לסבול.
* Tez הוא מנוע שכולל שיפורים ל Map-Reduce, שמרביתם מזכירים את הארכיטקטורה של Spark - מלבד הגמישות לתאר תהליכים כיותר מ-2 שלבים עיקריים.



Mahout


ארצה להזכיר בקצרה את Apache Mahout (שהוא חלק מ Hadoop), פירוש השם Mahout הוא "נהג הפיל" - ואכן Mahout רוכבת על הפיל (Hadoop).
זהו כלי שמספק סט של אלגוריתמים שימושיים הממומשים בצורה מבוזרת, חלקם על גבי Map-Reduce וחלקם על גבי Spark - ומאפשר לעבוד ברמת הפשטה גבוהה אפילו יותר.

סוגי הבעיות ש Mahout מכסה (כל אחת - בעזרת כמה אלגוריתמים שונים) הן:
  • סינון שיתופי - "אנשים שאהבו את x אהבו גם את y"
  • Classification - אפיון "מרחב" הנתונים מסוג מסוים - גם אלו שאין לנו. סוג של חיזוי.
  • רגרסיות - הערכת הקשרים בין משתנים שונים.
  • Clustering - חלוקה של פריטי מידע לקבוצות - ע"פ פרמטרים מסוימים.
  • ועוד
מימושים של האלגוריתמים  הנ"ל תוכלו למצוא בקלות למכונה יחידה, אבל אם יש לכם סט גדול של נתונים, או שאתם רוצים לחשב את האלגוריתמים בצורה תכופה - Mahout יחסוך לכם עבודה רבה!




.

Lambda Architecture



הכלים שהצגנו למעלה - מאפשרים לנו בחירה בין:
  • תשובה מדויקת - אטית (למשל: Map-reduce)
  • תשובה "לא מדויקת" - מהירה (למשל: Storm או Spark)
    בהנחה שאנו מקריבים דיוק עבור המהירות. למשל: משתמשים בחישוב בנתונים בני חמש דקות - ולא העדכניים ביותר, או חישוב על סמך דגימה חלקית - ולא על סמך כלל הנתונים.
בארגון בעל צרכי מידע - למה לבחור בין האופציות? האם אי אפשר לקבל את שניהם?

מקור: MapR


סוג של שילוב מקובל בין 2 הגישות נקרא "ארכיטקטורת למבדה", ספק אם על שם האות היוונית למבדה (Λ) שאם מטים אותה על הצד מזכירה מאוד את צורת הארכיטקטורה - ואולי בהקשר ל Lambda Calculus המבוססת על פישוט המערכת לפונקציות (בהקשר של תכנות פונקציונלי). הנחות היסוד של הארכיטקטורה הן:
  • מייצרים במערכת התפעולית שטף של אירועים - כולם immutable (בתרשים למעלה: "new data stream").
  • את האירועים אוספים ומפצלים ל-2 ערוצים:
    • ערוץ של שמירה לטווח ארוך - נוסח HDFS או AWS S3, ואז ביצוע שאילתות ארוכות באצווה (Batch).
    • ערוץ של עיבוד מיידי והסקת תובנות - נוסח Storm, שבו ההודעות ייזרקו מיד לאחר שיעובדו (הן שמורות לנו מהערוץ הראשון) - הערוץ המהיר.
  • את התובנות קצרות הטווח של הערוץ המהיר, ואת התובנות ארוכות הטווח - של ערוץ האצווה מרכזים ל storage שלישי (Service Layer) - לרוב HBase, Impala או RedShift, ממנו נבצע שליפות חזרה למערכת התפעולית.

כמו ארכיטקטורה, "ארכיטקטורת למבדה" היא רק תבנית מקובלת - ונכון יותר יהיה להתאים אותה לצרכים הייחודיים שלכם, ולא להיצמד בכוח לאיזו הגדרה מקובלת..




Apache Kafka


לסיום אני רוצה להזכיר עוד מערכת מבוזרת שהופכת פופולרית - Distributed Message Queue בשם קפקא.

בדומה למערכות הקודמות, קפקא היא מערכת Horizontally scalable ו Fault-tolerant.
בנוסף יש לה מימד של durability - הודעות נשמרות לדיסק ומשוכפלות בין מחשבים שונים.

בקפקא נלקחו בה כמה החלטות תכנוניות בכדי לתמוך בשטף אדיר של אירועים. קפקא מטפלת יפה בקצבים גבוהים מאוד של נתונים, במחיר של latency מסוים בכתיבת ובשליפת הודעות. שמעתי על חבר'ה שחווים latency אף של ל 2-3 שניות.

כלל-אצבע ששמעתי אומר כך: "אם יש לך עד 100,000 הודעות בשנייה - השתמש ב RabbitMQ, אם יש יותר - כדאי לשקול מעבר לקפקא".




כמה הנחות בעולם של קפקא הן:
  • הודעות ב Queue הן immutable - בלתי ניתנות לשינוי. ניתן להוסיף הודעות ל queue - ולקרוא אותן, אך לא לעדכן או למחוק אותן. ההודעות יימחקו כחלק מתהליך של ה Queue (למשל: לאחר 24 שעות) - והצרכן צריך לרשום לעצמו ולעקוב אלו הודעות כבר נקראו - כדי לא לקרוא אותן שוב. הן פשוט יושבות שם - כמו על דיסק.
    קפקא מנהלת את ההודעות כקובץ רציף בדיסק - מה שמשפר מאוד את יעילות ה I/O שלה. כל הפעולות נעשות ב Bulks.
  • ה Queue נקרא "Topic" ולא מובטח בו סדר מדויק של ההודעות (פרטים בהמשך).
  • Broker הוא שרת (או node) במערכת. אם לברוקר אין מספיק דיסק בכדי לאכסן את תוכן ה Topic, או שאין לו מספיק I/O בכדי לכתוב לדיסק את ההודעות בקצב מהיר מספיק - יש לחלק (באחריות ה Admin האנושי) את ה topic על גבי כמה partitions. ההודעות שמתקבלות ישמרו על partitions שונים, וקריאת ההודעות - גם היא תעשה מה partitions השונים.
קפקא עצמה מאוד מרשימה מבחינת יציבות וביצועים - אך זה נעשה במחיר אי-החבאת מורכבויות של מימוש מהמשתמשים שלו:

ה producer צריך לדעת על מספר ה partitions שבשימוש, מכיוון שקפקא מסתמך על כך שהוא עושה בעצמו מן "Client Side Load Balancing" (דומה לרעיון של Client-Side Directory שהסברתי בפוסט הקודם) - ומפזר את ההודעות בין ה partitions השונים (או שכל producer עושה זאת בעצמו - או שהם עושים זאת - כקבוצה).
הפיזור כמובן לא צריך להיות אקראי אלא יכול להיות ע"פ שדה מסוים בהודעה, שדה כמו "ארץ" - שיש לו משמעות עסקית.

ה consumer לא יכול לקרוא את ההודעות ב topic ע"פ הסדר - כי הן מפוזרות בין ה partitions השונים (קריאה נעשית רק מ partition מסוים). אין שום מנגנון בקפקא שמסייע להבין באיזה partition נמצאת ההודעה הבאה. בפועל מה שעושים הוא שמציבים consumer לכל partition - והם מעבדים את ההודעות במקביל, מבלי להתייחס להודעות שב partitions האחרים.
כמו כן - מישהו צריך לעקוב מהי ההודעה האחרונה שנקראה מכל partition: קפקא לא עושה זאת. הדרך המקובלת לעשות זאת היא להיעזר ב Zookeeper לניהול ה state המבוזר הזה (שיהיה יציב, זמין, וכו').
ב AWS Kinesis (וריאציה של Kafka - של אמזון) - מנהלים את ה state המבוזר ב DynamoDB (בסיס נתונים K/V - גם הוא מבוזר).


כל partition ממספר את ההודעות של ה topic באופן בלתי-תלוי. מקור: kafka.apache.org

לא מתאים לכם? - אתם יכולים לסדר את המידע בחזרה בעזרת מערכת נוספת שתעשה את העבודה (למשל: Storm?).

לכל Broker יש רפליקה (אחת או יותר), במבנה של Active-Passive. ה Active משרת את ה consumer - ואם הוא כושל, אחד ה Passives נבחר להיות ה leader - והופך להיות "ה Active החדש".

ה producer הוא האחראי על מדיניות ה consistency ויכול לבחור להחשיב כתיבה ל queue כמוצלחת כאשר ה Leader קיבל אותה, או רק כאשר ה Leader סיים לעשות רפליקציה של הנתונים ל broker נוסף (אחד או יותר - תלוי ברמת הפרנויה שלכם). התוצאה כמובן היא trade-off בין latency ל durability - שניתן לשחק בו בצורה דינאמית.

אפשר להשוות את קפקא למשאית: היא מאוד לא נעימה לצורך יציאה לדייט עם בחורה, או בכדי לקנות משהו בסופר.
אבל אם אתם רוצים להעביר אלפי טונות של מוצר כלשהו - בוודאי שתרצו כמה משאיות שיעשו את העבודה. צי של רכבים קטנים - פשוט לא יעשה עבודה טובה באותה המידה...



סיכום


בפוסט אמנם הצגתי הרבה מוצרי "Big Data" - אך הכוונה הייתה להתמקד באספקטים המבוזרים שלהם.
מערכות מבוזרות מוסיפות סיבוך למערכת - ולכן כדאי להימנע מהן במידת האפשר. כן... ברור לי שלא-מעט מהנדסים מחפשי-אתגר ימצאו את הדרך להשתמש במערכות מבוזרות - גם שאין צורך עסקי לכך.

יש הגיון מסוים, שבמקום לכתוב באסמבלי מערכת מבוזרת בעצמנו - נשתמש במערכות קיימות. המערכות שהצגתי בפוסט זה הן דוגמה טובה לכך. מכאן-לשם, חלק מהמערכות לא יעשו בדיוק את מה שאתם זקוקים לו, וזה רגע טוב לצלול לשכבת הפשטה נמוכה יותר (כמו Zookeeper - או קוד שלכם) - ולפתור נקודתית את הבעיה.


שיהיה בהצלחה!



----

לינקים רלוונטיים:

השוואה בין Spark ל YARN:
http://www.quora.com/What-is-the-difference-between-Apache-Spark-and-Apache-Hadoop-Map-Reduce

מצגת טובה אודות Spark:
http://stanford.edu/~rezab/dao/slides/lec1.pdf