Sien Festival 05.07.2024

Источник: instagram.com

05.07.2024 я по приглашению от bokeh посетил фестиваль электронной музыки Sien, который организовывал коллектив Zvuk.
Фестиваль Sien позиционирует себя как мультикультурный, изобретательный, открытый для всех и любопытный ко всему фестиваль, выражающий себя через музыку, технологии, танцы, эксперименты и перформансы. Сам фестиваль проводится в 4 дня, но мы посетили лишь второй день, где планировался рейв с привозом зарубежных диджеев и местных артистов.

К слову о диджеях из представленного line-up я не знал никого, но как мы помним по рейву от SHU[LAMA] меня это нисколько не остановило.

Тайм-лайн выступления артистов. Источник: instagram.com

Место проведения и организация

В качестве места проведения организаторы выбрали зону отдыха Balyqty Bulaq, где в обычное время можно попариться в баньке, прокатиться на лошадях, половить форель, попробовать кумыс и расположиться в юртах. Зона отдыха находится в 15 минутах на такси от центра Алматы и представляет из себя открытое пространство рядом с горами.

Источник: 2GIS

Приехали мы к 21:00 и уже тогда шел небольшой дождь, но мы ребята опытные, подготовились и оделись в теплую одежду. По приезде я сразу обратил внимание, что на зоне отдыха много мест, где можно посидеть, перекусить или выпить чего-нибудь бодрящего, а также важное два туалета. Ознакомившись с обстановкой на улице и вооружившись водой мы пошли искать сцены их было 3:

  1. Техно сцена
  2. Хаус сцена
  3. Экспериментальная музыка и селекторы

К слову техно сцену мы нашли быстро, так как оттуда доносились звуки прямой 1/4 бочки, туда мы и направились первым делом.

Выступления артистов

Техно сцена разместилась под шатром с некоторой инсталляцией в виде свисающих змей, дымовой машиной и неоновой подсветкой. Все в традиционной камерной атмосфере. По звуку было все хорошо, высокие не свистят, низкие не гудят. Первой по тайм-лайну выступала Mariya El

Mariya El — 1

Mariya El местами выдавала классные треки, но она открывающий диджей и уже несется такое техно как будто бы уже пик тайм вечеринки и конечно много смешения разных жанров привело к разной энергии на танцполе.

Mariya El — 2

bokeh в Алмате не знают что такое warm up, у них всегда техно с 140+ BPM на открытии :D

Mariya El — 3

А пока Mariya El рубит техно в 21:00 по-местному времени мы решили пойти посмотреть две другие сцены.

Akkujava

Хаус сцена полностью была open-air, что потом с ней сыграло злую шутку. Там играл прикольный хаус, который больше подходил под открытие мероприятия.

DekmantelSoundSystem

Ближе к 00:00 начал идти сильный дождь и все мигом разбежались с хаус сцены на техно сцену или укрываться под навесами, но благо ливень быстро закончился и мы продолжили зависать на хаус сцене пока не начали играть хэдлайнеры.

Bismildin

Что относительно экспериментальной сцены, то она расположилась в самой настоящей традиционной казахской юрте, где можно было посидеть, перевести дух с техно сцены.

Johannes Astrup — 1

И вот в полночь начинается жара, выходит первый хэдлайнер Johannes Astrup. Его уникальный стиль характеризуется быстрым техно с богатыми слоями ритма и перкуссии, а также включает в себя элементы грува и даже сальсы.

Johannes Astrup — 2

Нетипичный прикид, улыбка на лице и танцы от Йоханнеса заряжают весь танцпол

Johannes Astrup — 3
Johannes Astrup — 4

Следующим за Johannes выступает француз Mac Declos. Уже в 19 лет он получил возможность выступить по-крупному на фестивалях Nördik Impakt и Astropolis. На счету музыканта выступления в Berghain, Fuse, HÖR, PAL, Rex Club и на множестве других именитых площадок Европы. И можно сразу сказать он отменный диджей, умеет круто сводить треки и использовать эффекты по полной. Мой персональный фаворит.

Mac Declos — 1

Послушайте как он виртуозно использует вокальный семпл и подмешивает его с другим треком в лайв режиме, джог при выходе из ямы. Просто класс!

Mac Declos — 2
Mac Declos — 3
Mac Declos — 4
Mac Declos — 5

Следом за французом выступает Neri J — основательница Vortex parties и резидентка Den Anden Side. Тяжёлые удары с высокой интенсивностью и безостановочной подачей — вот отличительные черты одной из самых ярких звёзд копенгагенского андеграунда, которая выпускается и тесно сотрудничает с потрясающим лейблом BunkerBauer.

Neri J  — 1

С Neri J пошел крутой техно-транс и я решил устроить съемку с ракурса позади диджея :D

Neri J  — 2
Neri J  — 3

Один из закрывающих треков я впервые услышал lead партию за все мероприятие и как же он хорошо зашел после нескольких часов одного грува, перкусий и драмсов, потрясающие эмоции!

Neri J  — 4

Это был последний диджей кого мы полноценно послушали.

Итого

Я получил одно удовольствие вновь оказаться в атмосфере танцующих людей в камерном помещении с крутой музыкой и хорошей организацией мероприятия. Еще раз проверил на прочность свои кастомные беруши, и они вновь показали себя с лучшей стороны. Спасибо bokeh за приглашение, надо будет обязательно повторить!

Apache Airflow

Предпоссылки создания Apache Airflow

В 2014 году компания Airbnb стремительно развивалась, что привело к увеличению объема данных и усложнению рабочих процессов обработки данных. Управление этими сложными конвейерами данных с помощью традиционных инструментов, таких как Cron и пользовательские скрипты, стало нецелесообразным из-за недостаточной масштабируемости, гибкости и возможностей мониторинга. Airbnb требовалось решение, которое могло бы масштабироваться в соответствии с растущими потребностями в данных.Им требовалось отслеживать состояния рабочих процессов и возможность эффективно отлаживать сбои. Инициатива со стороны Maxime Beauchemin привела к созданию масштабируемого, гибкого и удобного инструмента управления рабочими процессами под названием Airflow. Сделав Airflow open-sourceным и наладив взаимодействие с сообществом, Airbnb не только решила свои внутренние проблемы, но и предоставила мощный инструмент широкому сообществу разработчиков. Сегодня Apache Airflow широко используется в различных отраслях индустрии для организации сложных рабочих процессов и конвейеров данных.

Apache Airflow — фреймворк для построения конвейров обработки данных. Airflow сам по себе не является инструментом обработки данных. Он управляет различными компонентами, которые отвечают за обработку ваших данных в конвейерах. Ключевая особенность Airflow заключается в том, что он позволяет легко создавать конвейеры обработки данных, запускаемых по расписанию с помощью языка Python.

Какие задачи решает Airflow?

  1. Оркестрация рабочих процессов: Airflow позволяет управлять зависимостями между задачами, обеспечивая их выполнение в нужном порядке
  2. Планирование задач: С помощью Airflow можно запланировать выполнение задач по заданному расписанию
  3. Мониторинг и управление задачами: Airflow предоставляет интерфейс для отслеживания выполнения задач, просмотра логов и управления задачами в реальном времени
  4. Интеграция с различными системами: Airflow может взаимодействовать с различными источниками данных и системами, такими как базы данных, облачные сервисы, API и другие.
  5. Автоматизация процессов: Airflow помогает автоматизировать повторяющиеся процессы, что снижает ручной труд и минимизирует ошибки.
  6. Логирование и отслеживание изменений: Airflow сохраняет логи выполнения задач, что помогает в отладке и анализе процессов.

Кому подойдет Airflow

— Для планирования задач, где возможностей Cron стало недостаточно
— У команды уже есть достаточная экспертиза в программировании на Python
— На проекте используется пакетная обработка данных (Batch), а не потоковая (Stream). AirFlow предназначен для Batch-заданий, для потоковой обработки данных лучше использовать Apache NiFi
— Планируется или уже осуществлен переход в облако и необходим надежный оркестратор, поддерживающий все принципы Cloud-Native

Конвейеры обработки данных в виде графов

Конвейер обработки данных легко представить в виде графа. В математике граф представляет собой конечный набор узлов с вершинами, соединяющими узлы. В контексте разработки данных каждый узел в графе представляет собой задачу, а зависимостями между задачами — направленные ребра между узлами. Ребро, направленное от задачи A к задаче B, указывает, что задача A должна быть завершена до того, как может начаться задача B. Такие графы обычно называются ориентированными, или направленными, потому что ребра имеют направление.

Пример ациклического направленного графа. Источник: airflow.apache.org

Данный тип графа обычно называется ориентированным ациклическим графом, поскольку он содержит ориентированные ребра и у него нет никаких петель или циклов (ациклический).

Пример циклического направленного графа

Анатомия DAG в Airflow

DAG (Directed Acyclic Graph) — направленный ациклический граф необходимый для смыслового объединения изолированных задач, которые необходимо выполнить в строго определенной последовательности согласно указанному расписанию. Задачами же в терминологии Airflow называют Task. Под задачами понимается, например, загрузка из различных источников, их агрегирование, очистка от дубликатов, сохранение полученных результатов и прочие ETL-процессы. На уровне кода задачи представляют собой Python-функции или Bash-скрипты в терминологии Airflow их называют Operator.

Task — это внутренние компоненты для управления состоянием operator и отображения изменений состояния (например, запущено/завершено) для пользователя. Task управляет выполнением оператора.

Operator — отвечает за выполнение одной единицы работы. Операторы — это некие готовые шаблоны, которые можно переиспользовать, реализующие логику выполнения (запуск скрипта, команды, обращение к СУБД) с набором параметров на входе. В AirFlow богатый выбор встроенных операторов. Кроме этого, доступно множество специальных операторов — путем установки пакетов поставщиков, поддерживаемых сообществом. Также возможно добавление пользовательских операторов — за счет расширения базового класса BaseOperator.

Sensor — это особая группа операторов, которые позволяют в случае наступления определенного события запустить выполнение следующих по зависимости задач. В качестве триггера могут выступать получение некоторого файла, данных, завершение другого DAGа или Task и так далее.

Так как Airflow оперирует языком Python, то у класса DAG могут быть свои экземпляры. В Airflow экземплярами DAGа называют DAG Run, связанные с этим DAGом экземляры задач — task Instance, а время запуска этих DAGов — execution_date.

Визуализация DAGа. Источник: cloud.vk.com

Архитектура Airflow и принципы его работы

Web Server — компонент, который позволяет визуализировать зависимости в Airflow, анализируемые планировщиком, и предоставляет пользователям основной интерфейс для отслеживания выполнения графов и их результатов. Помимо этого представления, Airflow также предоставляет подробное древовидное представление, в котором показаны все текущие и предыдущие запуски соответствующего DAG. Также здесь приводится беглый обзор того, как работал DAG, и оно позволяет покопаться в задачах, завершившихся сбоем, чтобы увидеть, что пошло не так.

Web UI Airflow. Источник: airflow.apache.org

Metadata DB (база метаданных) — собственный репозиторий метаданных на базе библиотеки SqlAlchemy для хранения глобальных переменных, настроек соединений с источниками данных, статусов выполнения Task Instance, DAG Run и так далее. Хранит состояния scheduler, executor и веб-сервера. Требует установки совместимой с SqlAlchemy базы данных, например, MySQL или PostgreSQL.

Scheduler (планировщик) — служба, отвечающая за планирование в Airflow и отправку задания на исполнение. Отслеживая все созданные Task и DAG, планировщик инициализирует Task Instance — по мере выполнения необходимых для их запуска условий. По умолчанию раз в минуту планировщик анализирует результаты парсинга директории dags и проверяет, нет ли задач, готовых к запуску.Если все в порядке,то начинает планировать задачи DAG для выполнения, передавая их воркерам Airflow. Для выполнения активных задач планировщик использует указанный в настройках Executor. Для определенных версий БД (PostgreSQL 9.6+ и MySQL 8+) поддерживается одновременная работа нескольких планировщиков — для максимальной отказоустойчивости.

Executor (исполнитель)— часть scheduler, механизм, с помощью которого запускаются экземпляры задач. Работает в связке с планировщиком в рамках одного процесса.

Worker (рабочий) — отдельный процесс, в котором выполняются запланированные задачи. Размещение Worker — локально или на отдельной машине — определяется выбранным типом Executor. Допускается неограниченное число DAG за счет модульной архитектуры и очереди сообщений. Worker могут масштабироваться при использовании Celery или Kubernetes.

DAG Directory — каталог исполняемых DAGов для остальных компонентов Airflow

Архитектура Apache Airflow. Источник: linkedin.com

Концепция планирования в Airflow

Выполнение работы с фиксированными интервалами
Для многих рабочих процессов, включающих временные процессы, важно знать, в течение какого временного интервала выполняется данная задача. По этой причине Airflow предоставляет задачам дополнительные параметры, которые можно использовать, чтобы определить, в каком интервале выполняется задача. Самый важный из этих параметров — execution_date, который обозначает дату и время, в течение которых выполняется DAG. execution_date (начиная с версии Airflow версии 2.2 logical_date) — это не дата, а временная метка, отражающая время начала интервала, для которого выполняется DAG. Время окончания интервала указывается другим параметром, next_execution_date (data_interval_end в поздних версиях Airflow 2). Вместе эти даты определяют всю продолжительность интервала задачи.

Объяснение интервалов в DAG. Источник: Книга. Apache Airflow и конвейеры обработки данных

Предположим, что у нас есть DAG, который следует ежедневному интервалу, а затем учитывает соответствующий интервал, в котором должны обрабатываться данные за 2019-01-03. В Airflow этот интервал будет запускаться вскоре после 2019-01-04:00:00, потому что в данный момент мы знаем, что больше не будем получать новые данные за 2019-01-03. Значение переменной execute_date при выполнении задач будет 2019-01-03. Это связано с тем, что Airflow определяет дату выполнения DAG как начало соответствующего интервала. Дата выполнения отмечает интервал, а не момент фактического выполнения DAG.

Запуск через равные промежутки времени
Airflow можно запускать через равные промежутки времени, задав для него запланированный интервал с помощью аргумента schedule_interval при инициализации DAG.

dag = DAG(
dag_id="dag_1", schedule_interval="@daily" <-- Планируем запуск каждый день в полночь
, start_date=dt.datetime(2019, 1, 1) <-- Дата и время начала планирования запусков DAG
... )

Airflow также нужно знать, когда мы хотим начать выполнение DAG, указав дату запуска. Исходя из этой даты, Airflow запланирует первое выполнение нашего DAG. Airflow запускает задачи в конце интервала. Если разработанный DAG вывели на продуктив 1 января 2019 года в 14:00 с start_date — 01-01-2019 и интервалом @daily, то это означает, что первый DAG Run случится в полночь 02-01-2019:00:00.

Интервалы на основе Cron
Для поддержки более сложных вариантов Airflow позволяет определять интервалы, используя тот же синтаксис, что и у cron — планировщика заданий на основе времени, используемого Unix-подобными операционными системами, такими как macOS и Linux. Достаточно указать crontab выражение в аргументе schedule_interval при инициализации DAGа.

from airflow.models.dag import DAG
import datetime

dag = DAG("regular_interval_cron_example", schedule="0 0 * * *", ...) <-- задача будет регулярно запускаться в полночь

Частотные интервалы
Если мы хотим запускать наш DAG раз в три дня, то в качестве интервала можно передать экземпляр timedelta (из модуля datetime в стандартной библиотеке Python) в schedule_interval.
— timedelta(minutes=10) каждые 10 минут
— timedelta(hours=2) каждые 2 часа

dag = DAG(
dag_id="dag_3", schedule_interval=dt.timedelta(days=3),
......)

Правила триггеров

Правила триггеров — это, по сути, условия, которые Airflow применяет к задачам, чтобы определить, готовы ли они к выполнению, ориентируясь на их зависимости. Правило триггеров по умолчанию — это all_success, которое гласит, что все зависимости задачи должны быть успешно завершены, прежде чем саму задачу можно будет выполнить.

А что произойдет, если одна из наших задач обнаружит ошибку во время исполнения? Это означает, что нижестоящую задачу уже нельзя выполнить, поскольку для ее успешного выполнения необходима успешное выполнение предшествующей задачи и она примет статус upstream_failed. Более подробно со статусами задач можно ознакомиться в официальной документации Airflow.

Airflow поддерживает и другие правила триггеров, которые допускают различные типы поведения при ответе на успешные, неудачные или пропущенные задачи. Правило none_failed означает, что оно допускает как успешные, так и пропущенные задачи, по-прежнему ожидая завершения всех вышестоящих задач, перед тем как продолжить. Например, правило all_done можно использовать для определения задач, которые завершили свое выполнение независимо от их конечного состояния. Более подробно об остальных правилах можно познакомиться в официальной документации Airflow

Web UI статусов задач. Источник: airflow.apache.org

Запуск задачи после определенного действия

Действия, связанные с запуском, часто являются результатом внешних событий. Представьте себе файл, который публикуется на общий диск, готовность данных за конкретную дату во внешней БД и т. д. Для решения подобных задач используют сенсоры, представляющие собой особый тип (подкласс) операторов. Сенсоры непрерывно опрашивают определенные условия, чтобы определить их истинность, и если условие истинно, то сенсор завершает свою работу со статусом успешно. В противном случае сенсор будет ждать и повторять попытку до тех пор, пока условие не будет истинно, или время ожидания истечет.

from airflow.sensors.filesystem import FileSensor

wait_for_supermarket_1 = FileSensor(
task_id="wait_for_data", 
filepath="data.csv",
)

Здесь FileSensor выполняет проверку на предмет наличия файла data.csv и возвращает true, если файл существует. В противном случае возвращается false, и сенсор будет ждать в течение заданного периода (по умолчанию 60 секунд) и повторит попытку.

Вывод сенсоров можно посмотреть в логах задач:

{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv

Здесь видно, что примерно раз в минуту сенсор осуществляет покинг на предмет наличия определенного файла. Покинг — так в Airflow называется запуск сенсора и проверка условия.

Зачастую сенсоры добавляются в самое начало DAGа, так как мы хотим запустить последующие манипуляции с данными только в случае их готовности на источнике. Таким образом, сенсоры в начале DAG будут непрерывно выполнять опрос на предмет наличия данных и переходить к следующей задаче после выполнения условия.

Web UI работы сенсоров. Источник: livebook.manning.com

А что произойдет, если однажды один из источников не предоставит свои данные за ожидаемое время? По умолчанию сенсоры будут падать, как и зависимые от них задачи со статусом upstream_failed. У сенсоров есть настраиваемый аргумент timeout, который содержит максимальное количество секунд, в течение которого сенсор может работать. Сенсоры принимают аргумент timeout, который содержит максимальное количество секунд, в течение которого сенсор может работать. Если в начале очередного покинга количество секунд превысит число, заданное для timeout, то это приведет к падению сенсора.

Web UI упавших сенсоров по timeout. Источник: livebook.manning.com

Количество задач (на разных уровнях), которые Airflow может обработать ограничено. Важно понимать, что в Airflow есть ограничения на максимальное параллельное количество выполняемых задач на разных уровнях; количество задач на каждый DAG, количество задач на глобальном уровне Airflow, количество запусков DAG на каждый DAG и т. д. У класса DAG есть аргумент concurrency, определяющий, сколько параллельных задач разрешено в рамках этого DAG. Таким образом каждый сенсор — это одна задача и в случае если у вас их много и значение concurrency меньше кол-ва сенсоров, то некоторые сенсоры будут простаивать и не запускать проверку своего условия, эти сенсоры будут ожидать пока освободится слот в DAGе, то есть пока не завершит свое выполнение один из ранее запустившихся сенсоров.

Класс сенсора принимает аргумент mode, для которого можно задать значение poke или reschedule (начиная с Airflow версии 1.10.2). По умолчанию задано значение poke, что приводит к блокировке. Это означает, что задача сенсора занимает слот, пока выполняется. Время от времени она выполняет покинг, осуществляя проверку условия, а затем ничего не делает, но по-прежнему занимает слот. Режим сенсора reschedule освобождает слот после покинга, слот остается занят только когда выполняется проверка.

Запуск задач с помощью REST API и интерфейса командной строки

Запуск DAGов можно осуществлять через REST API и командную строку. Это может быть полезно, если вы хотите запускать рабочие процессы за пределами Airflow. Данные, поступающие в случайное время в бакет AWS S3, можно обрабатывать, задав лямбда-функцию для вызова REST API, запуская DAG, вместо того чтобы постоянно запускать опрос с сенсорами.

airflow dags trigger dag1

Этот код запускает dag1 с датой выполнения, установленной на текущую дату и время. Идентификатор запуска имеет префикс manu­al__, указывая на то, что он был запущен вручную или за пределами Airflow.

Для обеспечения аналогичного результата можно использовать REST API (например, если у вас нет доступа к командной строки, но к вашему экземпляру Airflow можно подключиться по протоколу HTTP).

Преимущества и недостатки Airflow

Преимущества:
— Airflow — это фреймворк с открытым исходным кодом
— Конвейеры на основе кода обладают большими возможностями расширения. Преимущество того факта, что Airflow написан на языке Python, состоит в том, что задачи могут выполнять любую операцию, которую можно реализовать на Python. Со временем это привело к разработке множества расширений Airflow, позволяющих выполнять задачи в широком спектре систем, включая внешние базы данных, технологии больших данных и различные облачные сервисы, давая возможность создавать сложные конвейеры обработки данных, объединяющие процессы обработки данных в различных системах
— Конвейеры на основе кода более управляемы: поскольку все находится в коде, он может легко интегрироваться в ваш CI / CD управления версиями и общие рабочие процессы разработчика.
— Множество интеграций с разными системами
— Гибкий планировщик
— Возможность масштабирования
— Такие функции, как обратное заполнение (backfill), дают возможность с легкостью (повторно) обрабатывать архивные данные, позволяя повторно вычислять любые производные наборы данных после внесения изменений в код;
— Многофункциональный веб-интерфейс Airflow обеспечивает удобный просмотр результатов работы конвейера и отладки любых сбоев, которые могут произойти
— Поддерживаются все принципы Cloud-Native

Недостатки:
— Обработка потоковых конвейеров, поскольку Airflow в первую очередь предназначен для выполнения повторяющихся задач по пакетной обработке данных, а не потоковых рабочих нагрузок
— Реализация высокодинамичных конвейеров, в которых задачи добавляются или удаляются между каждым запуском конвейера. Хотя Airflow может реализовать такое динамическое поведение, веб-интерфейс будет показывать только те задачи, которые все еще определены в самой последней версии DAG. Таким образом, Airflow отдает предпочтение конвейерам, структура которых не меняется каждый раз при запуске
— Поддержка Airflow может быстро стать сложной в крупных проектах
— Высокий порог входа в фреймворк

Apache Kafka

Источник: pinterest.com

Message broker — это тип построения архитектуры, при котором элементы системы «общаются» друг с другом с помощью посредника. Данная архитектура нужна чтобы доставлять сообщения из пункта А в пункт Б, причем мы предполагаем, что эти сообщения бесконечны. Таким образом потоковая передача данных отличается от пакетной (пакетная рано или поздно завершится, имеет границы и ее можно разделить на эти границы). Message broker’ы активно используются в микросервисной архитектуре, где используется событийно-ориентированный подход. Преимуществами микросервиса над монолитным приложением являются низкая связь сервисов друг с другом, устойчивость приложений к сбоям за счет изоляции поставщиков (producer) и потребителей (consumer).

Типы Message broker’ов:
point-to-point, брокеры которые работают на принципе доставки сообщения в строгой последовательности в виде очереди, где одна система пишет сообщение по принципу first in/ first out, другая очередь эти сообщения вычитывает. Примеры таких брокеров: ZeroMQ, nanomsg, Java Message service (JMS)
publish / subscribe Есть некий producer, который публикует свои сообщения, есть так называемые потребители (consumer), которые эти сообщения получают именно по подписке. Строгая последовательность доставки сообщений не гарантируется. Системы с таким типом являются более масштабируемыми. Примеры таких брокеров: Celery, ActiveMQ, Apache Kafka, IBM MQ

В терминалогии потоковых данных событие генерируется 1 раз инициатором, а затем обрабатываются различными потребителями. Инициаторов может быть много, кто передает эти данные, также и потребителей может быть много.

Назначение Message broker:
— интеграция систем, написанные на разных языках программирования и протоколах
— гарантия надежного хранения
— гарантированная доставка
— масштабирование (как источников, так и потребителей)
— преобразование сообщений

Apache Kafka — это распределенная и легко масштабируемая система обмена сообщениями, написанная на Java и Scala, с высокой пропускной способностью, которая может в режиме реального времени обрабатывать большие объемы данных. Kafka появилась из-за необходимости компании LinkedIn эффективно перемещать огромные количества сообщений — до нескольких терабайт в час.

Верхнеуровнево Kafka состоит из:
— Broker
— ZooKeeper или KRaft
— Consumer
— Producer

Источник: habr.com

Broker — это серверное программное обеспечение с доступом к своему локальному диску, в которое producer’ы записывают данные, а consumer’ы читают данные, Broker эти данные аккумулирует и правильно сохраняет. Apache Kafka кластер состоит из множества Broker’ов, которые объединены в одну сеть.

ZooKeeper — это распределенное файловой хранилище необходимое для достижения согласованного состояния и синхронизации Broker’ов. Благодаря ZooKeeper мы можем управлять кластером Kafka, добавлять новых пользователей, создавать топики, задавать им настройки, обнаруживать сбои и восстанавливать работу кластера, хранить конфигурацию и секреты, авторизационные данные и ограничения или Access Control Lists при работе консумеров и продюсеров с брокерами.

Consumer — это приложение, которое имеет модуль Kafka, с помощью которого оно может прочитать сообщение. Приложение-консумер подписывается на события и получает данные в ответ.

Producer — это приложение, которое имеет модуль Kafka, с помощью которого оно может записать событие (сообщение) в кластер Kafka. Кластер сохраняет эти события и возвращает подтверждение о записи или acknowledgement.

Брокеры

Чтобы Broker’ы знали куда нужно отправить сообщение producer’а и какие consumer’ы могут читать эти сообщения существует такое понятие как Topic

Источник: lydtechconsulting.com

Topic — это базовая, основная сущность Apache Kafka, логическое разделение коллекции связанных сообщений на группы, последовательность событий. Топик удобно представить в виде лога, в который постоянно добавляются новые данные в конец, тем самым не разрушается цепочка старых событий. Отличие топика Kafka от остальных топиков очередей тем что данные в топике Kafka нельзя удалить используя Consumer или Producer.

Topic состоит из партиций, которых может быть одна или несколько. Партиции — это главный механизм масштабирования и отказоустойчивости. 1 партиция — это 1 копия данных. Партиция может находится как на одном брокере, так и на нескольких. Сколько нужно партиций зависит от того насколько много consumer’ов читает топик и насколько часто они читают из этого топика. Если довольно часто, то в идеале для каждого consumer’а иметь свою отдельную партицию.

Сами партиции физически представлены на дисках в виде сегментов. Сегмент — это отдельный файл, хранящийся на диске брокера. Файлы можно создавать, ротировать с одного брокера на другой, удалять согласно настройке устаревания. Число сегментов может варьироваться в зависимости от интенсивности записи и настроек размера сегмента. При превышении размера сегмента создается новый.

Хранение данных

Семантически и физически сообщения внутри сегмента не могут быть удалены, они иммутабельны. Всё, что мы можем — указать, как долго Kafka-брокер будет хранить события через настройку политики устаревания данных retention policy и указать cleanup policy, когда наступает некое событие для очистки данных.

Retention policy правила, которые позволяют избавляться от устаревших данных на основании времени. При достижении порога данные помечаются на удаление. Существуют следующие опции настройки:
— по милисекундам log.retention.ms
— по минутам log.retention.minutes
— по часам log.retention.hours
Помимо этого существует size based подход, где оценивается размер сегмента, который может хранить топик:
log.retention.size

Cleanup policy состоит из:
— Delete (по умолчанию). Помечает на удаление segment при устаревании / превышении размера
— Compact. Оставляет только последние сообщения для каждого ключа (message key)
— Delete и Compact. Производится compaction и удаление согласно retention policy

Репликация данных

Для обеспечения отказоустойчивости и сохранности данных существует механизм репликации между брокерами, который имплементируется на уровне партиций:
— У каждой партиции есть настраиваемое число реплик
— Одна из этих реплик называется партицией-лидером, которая принимает все запросы на запись/чтение данных. Все остальные являются партициями-фолловерами
— Записанные данные в партицию лидера автоматически реплицируются фолловерами внутри кластера Kafka. Фолловеры подключаются к лидеру, читают данные и асихронно сохраняют к себе на диск
— Роли лидеров и фолловеров не статичны. В случае выхода из строя брокера с лидирующими партициями, роль лидера достанется одной из реплик фолловеров, а консумеры и продюсеры получат обновление о необходимости переподключиться к брокерам с новыми лидерами партиций
— Начиная с версии Kafka 2.4 консумеры могут читать с партиций фолловеров. Это полезно для сокращения задержек при обращении к ближайшему брокеру в одной зоне доступности. Однако, из-за асинхронной работы репликации, взамен вы получаете от фолловеров менее актуальные данные, чем они есть в лидерской партиции

Источник: habr.com

Что из себя представляет сообщение Kafka

Каждое событие — это пара ключ-значение. Ключ партицирования может быть любой: числовой, строковый, объект или вовсе пустой. Значение тоже может быть любым — числом, строкой или объектом в своей предметной области, который вы можете как-то сериализовать (JSON, Protobuf, …) и хранить. В сообщении продюсер может указать время, либо за него это сделает брокер в момент приёма сообщения. Заголовки выглядят как в HTTP-протоколе — это строковые пары ключ-значение. В них не следует хранить сами данные, но они могут использоваться для передачи метаданных.

Источник: habr.com

Запись данных producer’ом

Чтобы producer смог отправить данные в Kafka кластер ему необходимо знать IP адреса всех брокеров и название топика. Перед тем как producer пойдет записывать данные он опросит брокер из своего пула и уточнит какая его партиция является лидером в топике и какое местоположение этой партиции в файловой системе. Запись осуществляется только в партицию-лидер.

Продюсер определяет стратегию партицирования, она может быть как по ключу сообщения, по очереди (round-robin), так и кастомная, реализованная на стороне продюсера. По ключу сообщения одного и того же идентификатора сохраняются в одну партицию. Примером такого ключа может быть номер карты, ID клиента и т. д. Например, ключ со значением Prod_id_1 всегда будет сохраняться в партицию 0, а со значением Prod_id_2 будет сохраняться в партицию 1, то есть данные не будут распределены по всем имеющимся партициям.

Источник: javapoint

При round-robin сообщения попадают в партиции по очереди, такая стратегия хорошо работает когда нужно равномерно распределить сообщения между всеми существующими партициями и очередность не играет роли.

Источник: javapoint

Семантики доставки сообщений
В очередях есть выбор между скоростью доставки и расходами на надежность доставки.
at-most once — при доставке сообщений устраивают потери сообщений, но не их дубликаты. Это самая слабая гарантия, которую реализуют брокерами очередей
at-least once — не хотим терять сообщения, но нас устраивают дубликаты
exactly-once — хотим доставить одно и только одно сообщение ничего не теряя и ничего не дублируя. Высокая надёжность данной семантики означает большие задержки

Источник: habr.com

Надежность доставки
Надежность доставки продюсером данных в брокер осуществляется с помощью возвращения подтверждения записи данных продюсеру в партиции регулируется параметром acks.
— При значении acks=0 продюсер не получает подтверждение от брокера, что запись произошла успешно. Данная политика влечет за собой риски потери данных
— При значении acks=1 продюсер будет ожидать получения ответа об успешной записи данных от лидера партиции
— При значении acks=all продюсер ожидает подтверждения и от лидера партиции, и от фолловеров партиции. Данный вариант является самым надежным и самым трудозатратным, так как требует больше накладных расходов: мало того, чтобы нужно сохранить на диск, так ещё и дождаться, пока фолловеры отреплицируют сообщения и сохранят их к себе на диск. При включенной опции enable.idempotence сообщениям проставляется PID (идентификатор продюсера) и увеличивающийся sequence number. Таким образом обеспечивается транзакционность и в случае сбоя в сети при попытке доставить сообщение повторно сообщения с одинаковым PID будут отброшены со стороны брокера.

Чтение данных consumer’ом

Консумеры читают данные синхронно или асинхронно из лидерской партиции — это позволяет достичь консистентности при работе с данными. Информация в партиции читается слева-направо. Консумеров, читающих сообщения из топика, может быть несколько, причем читать они могут с разных позиций партиции, тем самым не мешая друг другу. Также нет какой-то привязки к чтению по времени, в зависимости от задачи консумеры могут читать спустя дни, недели, месяцы или несколько раз через какое-то время. Сама Kafka (в данном случае брокер) не следит за тем какое сообщение будет читать consumer и когда ему приходить за этими сообщениями. Consumerы сами должны ходить в Kafka и читать оттуда сообщения, сами должны говорить Kafka какие сообщения им выдать

Источник: habr.com

Offset — это позиция сообщения в очереди Kafka. Начальная позиция в сообщении называется log-start offset. Позиция сообщения, записанного последним — log-end offset. Позиция консумера сейчас — current offset. Расстояние между конечным оффсетом и текущим оффсетом консумера называют лагом — это первое, за чем стоит следить в своих приложениях. Допустимый лаг для каждого приложения свой, это тесно связано с бизнес-логикой и требованиями к работе системы.

Consumer при чтении умеет запоминать на каком offset он остановился. Для этого существует commits, их может быть два:
auto commit (по умолчанию). Он вычитывает кол-во offset (батч оффсетов) и потом фиксирует, какое кол-во offsetов он прочитал.
user commit. Можно организовать свою логику фиксирования коммитов, например, прочитал 1 offset, зафиксировал коммит. Минус такого подхода — снижение производительности

Как обеспечивается то, что консумеры читают разные данные, разные партиции, а не одни и теже:
— Для этого есть такое понятие как консумер группы (consumer groups) — это множество консумеров объединившихся в один кластер.
— Каждый консумер в группе будет читать разные сообщения. Каждый консумер пойдет в свою партицию и будет читать именно ее. Читать будет с того места где он остановился прошлый раз
— Kafka сохраняет на своей стороне текущий оффсет по каждой партиции топиков, которые входят в состав консумер-группы. Консумер в группе, после обработки прочитанных сообщений отправляет запрос на сохранение оффсета — или коммитит свой оффсет
— Распределение партиций между консумерами в пределах одной группы выполняется автоматически на стороне брокеров. Kafka старается честно распределять партиции между консумер-группами, насколько это возможно.
— Консумер группы создаются для решения разных кейсов. То есть данные могут быть одни и те же, но пользователи и задачи решаемые этими пользователями будут разные. Например, один консумер использует логи авторизаций пользователей для нужд администраторов, а другая консумер группа в виде маркетологов нужно смотреть кол-во авторизовавшихся пользователей на странице.
— Каждая такая группа имеет свой идентификатор, что позволяет регистрироваться на брокерах Kafka. Пространство имён консумер-групп глобально, а значит их имена в кластере Kafka уникальны.

Источник: habr.com

Ребалансировка консумер-групп
При добавлении новых потребителей топика происходит ребалансировка консумер-группы. Процесс ребалансировки заставляет все консумеры прекратить чтение и дождаться полной синхронизации участников, чтобы обрести новые партиции для чтения. Как только группа стала стабильной, а её участники получили партиции, консумеры в ней начинают чтение. Поскольку группа новая и раньше не существовала, то консумер выбирает позицию чтения оффсета: с самого начала earliest или же с конца latest. Топик мог существовать несколько месяцев, а консумер появился совсем недавно. В таком случае важно решить: читать ли все сообщения или же достаточно читать с конца самые последние, пропустив историю. Выбор между двумя опциями зависит от бизнес-логики протекающих внутри топика событий.

Для того чтобы понимать какие из участников группы активны и работают, а какие уже нет, каждый консумер группы в равные промежутки времени отправляет Heartbeat-сообщение. Временное значение настраивается программой-консумером перед запуском. Также консумер объявляет время жизни сессии — если за это время он не смог отправить ни одно из Heartbeat-сообщений брокеру, то покидает группу. Брокер, в свою очередь, не получив ни одно из Heartbeat-сообщений консумеров, запускает процесс ребалансировки консумеров в группе.

Процесс ребалансировки проходит достаточно болезненно для больших консумер-групп с множеством топиков. Поэтому разработчикам программ-консумеров обычно рекомендуют использовать по одной консумер-группе на топик. Также полезно держать число потребителей не слишком большим, чтобы не запускать ребалансировку много раз, но и не слишком маленьким, чтобы сохранять производительность и надёжность при чтении. Значения интервала Heartbeat и время жизни сессии следует устанавливать так, чтобы Heartbeat-интервал был в три-четыре раза меньше session timeout. Сами значения нужно выбирать не слишком большими, чтобы не увеличивать время до обнаружения «выпавшего» консумера из группы, но и не слишком маленьким, чтобы в случае малейших сетевых проблем, группа не уходила в ребалансировку.

Источник: habr.com

Ещё один гипотетический сценарий: партиций в топике 4, а консумеров в группе 5. В этом случае группа будет стабилизирована, однако участники, которым не достанется ни одна из партиций, будут бездействовать. Такое происходит потому, что с одной партицией в группе может работать только один консумер, а два и более консумеров не могут читать из одной партиции в группе. Отсюда возникает следующая базовая рекомендация: устанавливайте достаточное число партиций на старте, чтобы вы могли горизонтально масштабировать ваше приложение.

Apache Spark

Источник: azul.com

Apache Spark — это универсальный, высокопроизводительный, отказоустойчивый движок, написанный на Scala, для распределенной обработки данных.

Универсальный
Spark способен работать с различными видами обработок данных:
— batch обработка
— ad-hoc запросы
— циклические алгоритмы (нужно запускать часть команд раз за разом итеративно)
— streaming обработка

Высокопроизводительный
Spark способен выполнять вычисления в оперативной памяти, но и с дисковой памятью он работает эффективно. Чтобы применить вычислительные преимущества Spark ему нужно выполняться на кластере серверов, а это значит, что необходима поддержка горизонтального масштабирования — разбиение системы на более мелкие структурные компоненты и разнесение их по отдельным физическим машинам, параллельно выполняющие одно и то же задание.

Отказоустойчивый
Spark в случае нехватки оперативной памяти для вычислений выполнит так называемый spill данных с оперативной памяти на диск тем самым работа Spark приложения не приостановится, но сильно замедлится.

spill — это термин для обозначения процесса перемещения данных из оперативной памяти на диск, а затем обратно в оперативную память.

С какой целью создали Apache Spark?

Spark был создан для устранения ограничений MapReduce у Hadoop за счет:
— обработки данных в оперативной памяти
— уменьшения кол-ва шагов в задании
— повторного использования данных в нескольких параллельных операциях

Spark требуется только один шаг чтения данных с диска в память, а затем выполняются операции трансформации данных и записываются результаты во внешние системы хранения. Использование оперативной памяти и сокращение числа операций с данными приводит к гораздо более быстрому выполнению заданий нежели как у Hadoop.

Spark также умеет повторно использовать наборы данных, помещенные в кэш памяти, чтобы значительно ускорить алгоритмы машинного обучения, которые многократно вызывают функцию в одном и том же наборе данных.

Кэширование данных — это высокоскоростной уровень хранения, на котором требуемый набор данных необходим временно. Доступ к данным на этом уровне осуществляется значительно быстрее, чем к основному месту их хранения. С помощью кэширования становится возможным эффективное повторное использование ранее полученных или вычисленных данных.

В чем основное отличие Spark и Hadoop. Источник: dimensionless.in

Работа Spark с данными в оперативной памяти влечет за собой риски по их потере, так как они будут стерты в случае если пропадет электричество или что-то пойдет не так с самими нодами.

Когда нужно использовать Apache Spark?

Параллельная обработка больших распределено хранящихся наборов данных
— Выполнение интерактивных ad-hoc запросов для изучения и визуализации наборов данных
— Построение сквозных конвейеров (ETL) обработки данных из различных источников
— Обработка потоков данных в режиме реального времени
— Создание и обучение моделей машинного обучения
— Анализ графов и соц сетей

Основные компоненты Apache Spark

Spark Core — основная библиотека, главный движок спарка, который занимается: поддержкой API, управлением памятью, распределением нагрузки, параллельностью запросов, взаимодействием с внешними системами хранения.

Spark SQL and DataFrames — библиотека унифицирует работу со структурированными данными, поддержка языка SQL.

Spark Streaming — библиотека потоковой обработки данных, не совсем реал тайм, а скорее микробатчинг.

RDD (Resilient Distributed Dataset) — самая низкоуровневая единица в иерархии модели Spark, представляет собой распределенный неизменяемый набор данных, который делится на множество частей, обрабатывающихся различными узлами в кластере. Когда использовать RDD:
— когда какая-то внешняя библиотека использует RDD
— детальная оптимизация кода, когда с помощью DataFrame уже не получается
— когда нужно давать Spark’у точные инструкции как нужно выполнять запросы с данными

Spark MLlib — это библиотека фреймворка Apache Spark, позволяющая реализовывать механизм машинного обучения (Machine Learning, ML) и решать задачи, связанные с построением и обучением ML-моделей.

Spark GraphX — это компонент Apache Spark, специализирующийся на анализе графов. Графовые структуры широко используются в различных областях, таких как социальные сети, транспортные системы, биоинформатика и многое другое

Catalyst Optimizer — оптимизатор плана запроса. Catalyst — это дерево, состоящее из узловых объектов. Каталист выполняет следующих шаги для оптимизации:
— Анализ — определить тип каждого передаваемого столбца и действительно ли существуют столбцы, которые вы используете
— Логическая оптимизация (логический план составляет дерево, которое описывает, что нужно сделать). Добавляется predicate-pushdown (добавление фильтра where в условие для внешних источников). Predicate-pushdown поддерживается у JDBC источников, форматов Parquet, Avro, ORC. Помимо Predicate-pushdown существует Projection Pushdown направлено на то, чтобы как можно раньше удалить ненужные столбцы из вычислечний или не извлекать их вообще
— Физическая оптимизация (физический план точно описывает, как нужно делать)Например, логический план просто указывает на то, что необходимо выполнить операцию join, а физический план фиксирует тип соединения (например, ShuffleHashJoin) для этой конкретной операции. Физическим планом часто называют Spark планом, который указывает как логический план будет выполняться на кластере используя разные стратегии физического выполнения и сравнивая их друг с другом посредством модели оценки (cost model). Примером модели оценки может быть сравнение как будут выполняться разные виды Join в зависимости от размера таблицы, насколько большие партиции. Результатом физического плана является RDD трансформации, которые выполняются на каждом узле. Иногда Spark называют компилятором, который принимает запрос через DataFrame, DataSet, SQL API, а потом Spark под капотом компилирует этот запрос в трансформации над RDD
— Генерация кода — создание байт-кода Java для запуска на каждой машине

Pyspark — это API, позволяющее работать с Apache Spark с помощью Python.

Компоненты Spark и языки программирование, которые могут работать со Spark. Источник: moazim1993.github.io

Архитектура Spark

Spark Application — приложение, которое состоит из Spark Driver и Spark Executors.

Spark Driver — компонент, который состоит из одного экземпляра в кластере и отвечает за:
— инициирование исполнения программы
— планирует, распределяет и запускает работу Spark Executors
— формирует план запроса, так называемый DAG (Directed Acyclic Graph)

Spark Executor — компонент, который выполняет все распределенные вычисления. Executorов может быть от 0 до n, отвечает за:
— выполнение кода программы, отправленный Spark Driver’ом
— выполняет все распределенные вычисления
— передает информацию о процессе вычислений

Архитектура Spark. Источник: medium.com

Контейнер — это некий формат пакетирования, упаковки вашего приложения, он позволяет весь ваш код вместе с зависимостями упаковать в некий стандартный формат, чтобы ваше приложение могло запускаться на разных платформах, вычислительных средах.

Spark Session — точка входа для функциональных возможностей Spark:
— позволяет конфигурировать ресурсы, выделяемые для вычислений
— предоставляет доступ ко всем операциям Spark, в том числе и к данным
Spark Session позволяет выделять ресурсы для Spark приложения статически и динамически. При статическом выделяются и резервируются указанное кол-во ресурсов. При динамическом аллоцировании ресурсов они запросятся в нужном размере необходимом для Spark приложения. Помимо этого в Spark есть параметр, который указывает какое кол-во времени ресурсы Spark при динамическом аллоцировании могут простаивать и при достижении этого времени они высвобождаются для решения последующих заданий других приложений.

Архитектура Spark приложения во время исполнения

Spark Application  — представляет собой распределенное приложение для получения максимальной производительности для которого необходим кластер, состоящий из нескольких машин под управлением Resource Manager (RM). Наиболее популярными решениями запуска Spark приложения в кластере являются YARN Resource Manager и Kubernetes cluster. Основное отличие запуска Spark в YARN и Kubernetes в том, что развернутые контейнеры YARNом при необходимости могут самостоятельно запрашивать дополнительные ресурсы для вычислений у Resource Manager, в то время как контейнеры в Kubernetes cluster так не умеют.

Самые популярные Resource Manager для Spark. Источник: medium.com

Spark submit — один из подходов запуска Spark приложения в виде консольной утилиты, которая позволяет запускать приложение в кластере. Утилита читает код, подтягивает конфигурацию и отправляет задачу на деплой в кластер, cluster менеджер выделяет ресурсы и разворачивает приложение.

Spark Submit на кластере YARN. Источник: medium.com

Режимы Deploy Spark приложения

  1. cluster deploy_mode:
    — Driver будет запущен на одной из машин YARN кластера
    — Cluster Deploy mode снижает нагрузку на сеть (network latency), где Driver находится в одном контуре с Executor’ами. Вероятность разрыва связи между Driver’ом и Execut’oром в случае с cluster deploy mode’ом намного меньше, так как Driver и Executor находятся в одной сети.
spark-submit —master yarn —deploy_mode cluster

Обычно deploy_mode cluster используется в PRODUCTION среде, где для запуска Spark Driver выделяется отдельная машина. В случае запуска приложения в cluster deploy_mode режиме с помощью YARN, запускаемый им Application Master = Spark Driver.

Визуализация разворачивания Spark приложения через Spark Submit. Источник: linkedin.com
  1. client deploy_mode:
    — Client mode режим используется по умолчанию
    — Driver запускается на той же машине откуда и производится spark-submit
    — client mode обычно используется для прототипирования, отладки, когда мы хотим увидеть промежуточный результат наших расчетов. Например, работая в JupyterHub’е, сессия запускается всегда с client deploy_mode
spark-submit —master yarn —deploy_mode client

Spark DataFrame и виды операций с ним

Spark Dataframe — наиболее распространенная неизменяемая структура данных в Spark, представляет собой набор типизированных записей, разбитых на блоки. Иными словами — таблица, состоящая из строк и столбцов. Блоки могут обрабатываться на разных вычислительных узлах кластера.

Spark Dataframe — это как партицированная таблица. Источник: nvidia.com

DataFrame поддерживает привычные для работы с данными операторы, такие как:
— select
— filter (фильтрация)
— sort
— withColumn (новые столбцы)
— join (соединение таблиц) и прочие

Какие форматы и источники поддерживает Spark Dataframe. Источник: databricks.com

Spark отчасти напоминает концепт библиотеки Pandas в Python.
Существуют два вида основных операций с Spark Dataframe:
transformation — изменение существующего Dataframe (возвращает другой dataframe, например, с помощью filter, join)
action — метод, который поручает Spark вычислить, записать, вернуть результат цепочек преобразований и трансформаций. Такими методами являются, например: count(), show(), write(), save()

Ленивые вычисления — это стратегия вычисления в Spark, при которой запуск расчетов откладывается до того момента, когда понадобится результат. Необходимо это для сокращения объема вычислений за счет оптимизации и исключения вычислений, результат которых не используется. Реализовано это следующим образом, команды преобразования данных запускаются только после вызова action методов в виде одного оптимизированного плана запроса. Оптимизатор строит наиболее эффективный алгоритм, который уже и запускается.

Операции transformation — ленивые операции, они никаких вычислений не запускают. Операции трансформации делятся на:
Narrow dependency выполняются параллельно над партициями данных (select, filter, drop и не вызывают операции перемешивания (shuffle) данных)
Wide dependency выполняются на сгруппированных данных, собранных из нескольких партиций (.groupBy(), .join(), orderBy(), distinct(), repartition() вызывают операцию shuffle)

Иллюстрация narrow и wide трансформаций. Источник: horicky.blogspot.com

Data shuffle — это процесс перераспределения данных executor’ами для выполнения дальнейшей обработки над сгруппированными данными. Операцию shuffle инициируют широкие (wide) трансформации и прямое использование команды repartition. Результат каждого shuffle записывается на диск.

Принцип работы shuffle данных. Источник: towardsdatascience.com

Операции перетасовки данных являются дорогостоящими и часто являются причиной снижения скорости выполнения распределенного приложения из-за:

  1. Disk IO операции чтения/записи с диска
  2. Network IO — передачи данных по сети между исполнителями или даже между рабочими узлами в кластере
  3. Serialization / Deserialization — конвертация Java объектов, представляющие собой обрабатываемые Spark’ом данные в поток байтов (bytes-stream) для передачи их по сети (deserialization соответственно обратный процесс — конвертация bytes-stream в Java объекты).

Операцию shuffle избежать не получится, но можно минимизировать затраты, например, за счет использования фильтрации данных до широких трансформаций, тем самым уменьшая общее кол-во данных к shuffle

Spark Job, Stage, Task

При выполнении action-операций таких как collect(), count() и т. п. инициируется запуск такого компонента как Spark Job.

Spark Job — задача исполнения графа вычислений (DAG), которая разбивается на Spark Stage, а Spark Stage декомпозируется на Spark Task. Каждая action-операция создает отдельный pipeline вычислений, то есть новый Spark Job. Spark Jobы могут работать последовательно и параллельно, в данном случае как выполнять Jobы решает оптимизатор Spark.

Spark Stage — этапы вычислений, на которые разбивается Job, которые зависят друг от друга и используют общий результат перемешивания/перетасовки (shuffle) данных в рамках этапа расчета. Операция перетасовки (shuffle) данных создает новый stage. Stage’ы могут выполняться как параллельно, так и последовательно и делятся на два типа:
— те операции что не вызывают shuffle у Spark могут быть обработаны параллельно. Например, чтение файла.
— последовательно stage’ы выполняются если у них есть функциональная зависимость, stage’ы построен так, что один stage должен получить результат другого stage’а. Например, джоин двух таблиц, чтение данных может произойти параллельно, а вот сам джоин уже будет идти последовательно

Spark Task — наименьшая исполнительная единица в Spark, которая выполняет серию инструкций, например, чтение данных или фильтрацию. Task’и выполняются внутри Executor’ов. Таски — юниты исполнения, которые распределяются между экзекьюторами, каждая таска дается на выполнение одному экзекьютору и выполняется над одной партиции данных. Таким образом один экзекьютор с 16 ядрами (cores) может обрабатывать 16 партиций параллельно.

Схематичное описание работы Spark Job, Stage, Task. Источник: medium.com

Партиционирование в Spark

Apache Spark предназначен для обработки больших данных (Big Data), и партиции являются одним из способов это сделать. К плюсам партиций можно отнести:
— быстрый доступ к данным;
— возможность производить операции на меньших датасетах;

Действительно, хранить в оперативной памяти огромный датасет может быть не эффективно, однако его заранее можно разбить на меньшие части, и уже работать с ними. К тому же фильтрационные запросы подразумевают партицирование

К минусам партиций можно отнести:
— наличие большого числа партиций (операции ввода-вывода медленные);
— не решают проблему неравномерности данных (партиции могут быть разного размера)

Существует два вида операции с партиционированием, одни выполняются в оперативной памяти, а другие на диске.

Операции партиционирования в памяти

repartition() — принудительный shuffle данных, на вход подается два аргумента, кол-во партиций и список колонок по которым делается репартиционирование. Это команда делает следующее, одинаковые ключи отправляет на одинаковые executor’ы. С помощью этого метода можно увеличить количество партиций тем самым увеличив уровень параллелизма в кластере Apache Spark. Repartition может хорошо помочь в случае неравномерного распределения или перекоса данных.

Принцип работы метода repartition. Источник: medium.com

coalesce() — операция оптимизации, которая только уменьшает число партиций наиболее эффективным образом и при этом минимизирует количество перетасовок shuffle. Не может увеличить количество партиций, т. е. повысить уровень параллелизма в кластере Apache Spark. Данная операция может полезна для разделения данных на маленькое кол-во партиций без full shuffle, но есть риск, что данные могут разделиться неравномерным образом.

Принцип работы метода coalesce. Источник: medium.com

Операция партиционирования на диске
Партиция на диске выполняется с помощью вызова метода partitionBy. При создании партиций на диске они будут сгенерированы на основе уникальных значений столбца(ов), по которому(ым) происходит разбиение. Иными словами, partitionBy работает как groupBy, только вместо групп на диске создаются файлы с партициями.

Разница между операциями в памяти и операциями на диске
partitionBy изменит структуру папок, то есть по сути это аналог партиций Hive, где под каждое значение партиционированной колонки создастся своя папка в HDFS. В то время как repartition() или coalesce() оперируют кол-вом файлов, которые будут записаны в папку HDFS.

Best Practices

  1. Использовать динамическую аллокацию ресурсов вместо статической. Приложение будет запрашивать ресурсы по мере необходимости и возвращать неиспользуемые ресурсы кластеру. На кластере могут работать тысячи пользователей, при динамической аллокации распоряжение ресурсами происходит более грамотно и можно не переживать за оставленную без присмотра spark-сессию.
    Про выделение ресурсов:
    — Для Spark Driver’а достаточно 1 ядра (core), 1 гигабайт оперативной памяти
    — Если много данных, то нужно выделять много памяти на один Spark Executor, 1-2 ядра и увеличивать кол-во Executor
    — Если мало данных, но много операций, трансформаций с данными, то нужно в приоритет ставить кол-во ядер, но меньше выделять памяти
    — Параллелизм в Spark определяется по формуле: кол-во экзекьюторов умноженное на кол-во ядер одного экзекьютора, то есть, если у нас spark.executors.cores = 2 и spark.executors.instances = 10, то параллельно 1 экзекьютор может обработать 20 партиций
    — Все Executor’ы используют общую оперативную память, то есть она делится между кол-вом ядер этого экзекьютора. Если spark.executors.memory = 8 и spark.executors.cores = 2, то на каждое ядро экзекьютора будет по 4 ГБ памяти
    — Существуют операции, которые возвращают данные в Spark Driver collect(), reduce(), count(). Важно понимать, что зачастую на Spark Driver выделяют ограниченное кол-во ресурсов, поэтому данные большого размеры стягивать нельзя. Перечисленные операции используются для получения результата на небольшом по размеру DataFrame. Помимо перечисленных методов данные стягиваются на Spark Driver при broadcast join. При broadcast join датафрейм стягивается на Spark Driver, а дальше его копии рассылаются на каждый Executor. Если датафрейм очень большой, то это будет неэффективно
  2. Следить за кол-вом дорогостоящих операций.
    Дорогостоящие операции это:
    — чтение исходных данных с диска. Чтобы существенно увеличить производительность в этом пункте необходимо использовать фильтры по полям партицирования таблиц, тем самым мы сократим кол-во перемещений данных по сети между Executor’ами, пользоваться преимуществами столбчатого хранения (ORC/Parquet), а именно считывать только нужные для расчетов поля
    — мониторить за вытеснением данных из памяти экзекьютора на локальный диск (spill). Вытеснение данных происходит на локальный диск Executor. После вытеснения данных нужно их снова считывать с диска. Помогает выделить больше памяти, большее кол-во Executor
  3. Обнаружение вытеснения данных с памяти на диск
    — Событие вытеснения на диск можно увидеть в логах Executor
    — нужно выделять достаточное кол-во памяти на процесс (executor.core) + следить за объемом данных и перекосами. Бывает что данные между экзекьюторами распределяются не равномерно, один экзекьютор читает большую партицию, а остальные маленькие партиции и после завершения работ с маленькими партициями остальные ждут окончания расчета у экзекьютора с большой партицией.
    минусы shuffle:
    — сериализация/дисериализация данных
    — пересылка данных по сети
    — возможны вытеснения на диск
  4. Анализировать план запросов с помощью explain.
    Оператор возвращает подробную информацию о том, какой план исполнения будет выбран оптимизатором для выполнения скрипта. Запросы нужно читать снизу вверх, смотреть что применились фильтры по партицированию.
  5. Грамотно использовать кэширование данных
    Кэширование в Spark — механизм сохранения промежуточных вычислений в памяти экзекьюторов. Экзекьюторы сохраняют обрабатываемые партиции данных в специально выделенную область памяти. Кэширование нужно использовать если один тот же датафрейм используется в алгоритме несколько раз, необходимо это чтобы дважды или более раз не считывать данные с диска. Кэширование происходит с помощью метода cache(). Лучшими практиками является создавать новую переменную под кэшированный датафрейм и хранить в нем только необходимые столбцы, например:
cache_df = df.cache()

Для удаления объекта из кэша используется метод unpersist().

  1. По возможности избегать использоваться UDF
    UDF — это реализованные пользователем функции, которые не содержатся во встроенных модулях Spark. В PySpark использование UDF ведет за собой существенную деградацию производительности приложений.
    — сериализация/дисериализация данных между JVM и Python интерпретатором
    — черный ящик для оптимизатора запросов. Оптимизатор запроса не знает ничего про UDF и не сможет оптимизировать план запроса
    — Если нужны UDF, то нужно обратить внимание на Pandas UDF
  2. Broadcast join должен производится только на небольших датафреймах
    Основное требование — один из датафреймов должен полностью перемещаться в память и драйвера, и экзекьютора. BroadcastHash join производится по хэшам ключей. Для построения HashMap, которая будет разослана на все экзекьюторы, Spark сначала вытянет все данные на драйвер. Для определения размера датафрейма Spark опирается на статистику по таблице, хранящуюся в том же Hive, а она может быть некорректной, что приводит к переполнению ресурсов Spark Driver
  3. При записи в HDFS не нужно создавать много маленьких файлов
    При проведении трансформаций spark по умолчанию создает 200 партиций. При записи на диск каждая партиция будет записана в отдельный файлик. Особенности архитектуры HDFS, каждый блок занимает оперативную память на NameNode, а она лимитированная. Лучшими практиками в данной ситуации будет:
    — кол-во файлов не превышает параллелизм при чтении
    — размер файла меньше размера блока на Data Node и все файлы примерно одинакового размера

Apache Hadoop

Источник: wikimedia.org

Hadoop — проект фонда Apache Software Foundation, созданный в 2005 году и предназначенный для эффективного хранения и обработки больших наборов данных объемом от сотен гигабайт до петабайтов.
Под словом Hadoop может подразумеваться:
— Несколько сервисов, составляющих ядро Hadoop (YARN, HDFS, MapReduce)
— Вся экосистема сервисов Hadoop
— Кластер под управлением Hadoop

Предпосылки создания Hadoop

Из-за значительного роста обрабатываемых данных (терабайты, петабайты) действующие на тот момент системы уже эффективно не справлялись с обработкой такого потока данных. Помимо этого необходимо было где-то разместить (хранить) такое кол-во данных без потери информации, с сохранением ее постоянной доступности. Так появилась потребность в распределенном хранилище данных. Чтобы обеспечить высокую пропускную способность большого кол-ва данных необходимо было распараллелить вычисления эффективным и отказоустойчивым способом на кластере машин. Кроме того появление новых источников нетабличных данных как, например, логи, содержимое веб-страниц, тоже являлось предпоссылкой создания нового решения обработки и хранения данных, так как сложно решить данные задачи посредством обычных реляционных БД. Одним из основных достоинств Hadoop является то, что он проектировался так, чтобы стабильно работал на кластерах серверов не премиального класса.

В то время при разработке приложений для распределенной обработки данных возникали следующие проблемы:
1) В кластере серверов сложно определить какой сервер является лидером, то есть сервер, который управляет всеми остальными
2) Проблема координации кластера
3) Отсутствие достаточной отказоустойчивости. При реализации сценария выхода из строя одной ноды не было четкого плана действий, механизмов у кластера для автоматического восстановления работы системы или продолжения функционирования системы без отказавших нод
4) Проблема консистентности данных. Сложность разработки приложения с распределенной обработкой
5) Отсутствие инструмента по управлению приоритетов обработки данных, по управлению вычислительными ресурсами кластера (CPU, RAM, HDD) в том числе если ресурсы запрашивают сразу несколько приложений

Поэтому чтобы решить все вышеперечисленные проблемы был создан Hadoop.

Ядро Hadoop 2.0

Источник: www.dotnettricks.com

MapReduce — это фреймворк для обработки наборов данных с помощью параллельного распределенного алгоритма
HDFS — файловая система, предназначенная для хранения файлов больших размеров, поблочно распределенных между узлами вычислительного кластера
YARN — модуль, отвечающий за управление ресурсами кластеров и планирование заданий
Others data processing — остальные фреймворки по процессингу данных, например, Apache Spark, Apache Kafka, которые позволяют обрабатывать данные с помощью новых оптимизаций, что приводит к значительному приросту производительности по сравнению с MapReduce. Помимо этого другие фреймворки могут работать с разными видами нагрузок данных, например, streaming, циклические алгоритмы, в то время как MapReduce предназначен для batch обработки.

Apache ZooKeeper

Apache ZooKeeper — инструмент, который следит за синхронизацией, координацией, состоянием всего кластера распределенных приложений. ZooKeeper позволяет Big Data разработчику сосредоточиться над логикой своего приложения, вся координация сервисов ложится на плечи ZooKeeper. Apache ZooKeeper представляет из себя консистентную файловую систему небольшого размера.

Источник: dataview.in

Архитектура HDFS

Источник: linkedin.com

HDFS (Hadoop Distributed File System) — это распределенная append-only файловая система для хранения файлов больших размеров, поблочно распределенной по узлам вычислительного кластера. Любая файловая система состоит из иерархии каталогов с вложенными в них подкаталогами и файлами.

NameNode — это отдельный сервер, единственный в кластере для управления пространством имен файловой системы, хранящий дерево файлов, а также мета-данные файлов и каталогов. Данный сервер отвечает за открытие и закрытие файлов, создание и удаление каталогов, управление доступом со стороны внешних клиентов и соотвествие между файлами и блоками, которые реплицируются на остальные узлы данных. Сервер NameNode раскрывает расположение блоков данных на машинах кластера.

Secondary NameNode — отдельный сервер, единственный в кластере, который копирует образ HDFS (fsimage) и лог транзакционных операций (чтения, записи, удаления и т. д.) с файловыми блоками, применяет изменения, накопленные в логе к образу HDFS, а также записывает его на узел NameNode. Secondary NameNode необходим для быстрого ручного восстановления NameNode в случае его выхода из строя.

fsimage — образ файловой системы HDFS, в котором хранятся логи операций, которые происходили на NameNode. Так как вычитывать все операции логов над блоками за глубокую историю HDFS неэффективно, значит нужно объединять (merge) предыдущую версию логов операций с файловой системой с новоприбывшей пачкой логов. Данный процесс и происходит в fsimage, где он периодически обновляется новыми логами. В случае выхода из строя NameNode и его успешного восстановления, то чтобы ему вернуться к актуальному состоянию файловой системы HDFS происходит чтение fsimage с последнего объединения логов в Secondary NameNode.

DataNodes — множество серверов в кластере, отвечающие за файловые операции, хранение и работу с блоками данных. Основная необходимость DataNode — это чтение и запись данных, выполнение команд от NameNode по созданию, удалению и репликации блоков, а также периодическую отправку сообщений о состоянии обработки запросов на чтение и запись, поступающих от клиентов файловой системы HDFS. Приложению работающему с Apache Hadoop не достаточно иметь доступ только к NameNode, она должна иметь доступ к каждой DataNode.

В HDFS есть 3 места, где могут храниться его настройки:
— глобальные настройки для всего HDFS кластера, это те настройки, которые применяются по умолчанию, если мы не указали что-то отличное. Одной из таких настроек — это размер блока файла.
— настройки для конкретной папки HDFS
— настройка для конкретного файла

Источник: phoenixnap.com

Блоки и файлы
Файл — это только запись в мета-данных на NameNode, содержимое же файла хранится в нескольких блоках одинакового размера на DataNode’ах. Основное назначение блоков в том чтобы сделать процесс чтения и записи предсказуемым. Сложно управлять одним большим файлом, переместить с одного сервера на другой, так как он может быть большим по объему. Когда нам нужно прочитать этот файл, то мы смотрим в справочник метаданных на NameNode и узнаем на каких DataNode’ах находятся блоки этого файла. Важно понимать, что DataNode’ы не знают что за блоки данных они хранят, к какому файлу они относятся и в случае выхода из строя NameNode’ы собрать блоки в единый файл не получится.

Размер блока для каждого файла можно настраивать в момент записи. Если у нас настроено, что 1 блок хранит 64 МБ данных, то файл размером 65 МБ будет распределен на 2 блока, где 1 блок имеет 64 МБ данных, а второй 1 МБ данных. Блоки как уже понятно нарезаются по размеру и все блоки кроме последнего имеют один и тот же размер. Размер блока после записи файла поменять нельзя, так как блоки уже распределены на разные сервера.

В HDFS существует проблема записи мелких файлов. NameNode хранит файлы в оперативной памяти, где как раз и лежит информация о связи файла с блоками и чем больше файлов, тем быстрее закончится место, поэтому важно следить за записью файлов и не плодить их с помощью мелких для экономии места в оперативной памяти NameNode. Примерный размер для хранения каждого такого файла равен 150 байт. Поэтому небольшие файлы в HDFS хранить неэффективно: много мелких файлов будут занимать много места на NameNode, больше, чем требуется для хранения их содержимого. Для сети в случае падения DataNode’ы проще дореплецировать 1 раз размер блока в 1 ГБ, чем 1000 мелких файлов в 1 МБ.

У каждого блока данных есть отдельный файл, который хранит 3 контрольные суммы (3 суммы нужны чтобы избежать коллизий). Этот файл с хэш суммами нужен чтобы проверить, что файл блока не битый. В HDFS есть процесс, который ходит по DataNode’ам и пересчитывает контрольные суммы файлов, это необходимо чтобы проверить, что блок не сломался или блок не изменен руками администратора.

Источник: stackoverflow.com

Репликация — процесс создания копии данных с целью обеспечения сохранности данных, в случае потери одной реплики у нас всегда оставалась запасная, резервная. Создавая файл в HDFS можно указать размер файла (64 МБ по умолчанию) и кол-во реплик, по умолчанию 3. Для достижения максимальной надежности 2 и 3 реплика помещаются на разные стойки узлов данных. HDFS следит за тем, что если какие-то файлы у него недореплицированы, то он сам автоматически начинает их реплицировать до нужного количества.

Источник: hadoop.apache.org

Чтение данных в HDFS
Клиент HDFS запрашивает информацию у NameNode через специальный интерфейс Distributed File System о файле, проверяет существует ли он, какие права доступа. В случае прохождения всех этапов проверки NameNode выдает клиенту информацию о файле, где находятся его блоки на DataNode. Получив эту информацию клиент с помощью интерфейса FSDataInputStream идет на каждую DataNode’у и выкачивает нужную информацию.

Источник: javatpoint.com

Запись данных в HDFS
Клиент HDFS иницирует запрос на запись файла к NameNode. NameNode со своей стороны проверяет существует ли такой файл и есть ли у клиента права на запись в этот файл. Если проверки прошли успешно, то NameNode в своих мета-данных создает запись для файла. Далее клиент делит файл на несколько пакетов согласно установленному размеру блока и управляет ими в форме очереди данных. HDFS последовательно отправляет пакеты на запись в DataNode’у, создавая новые блоки. Запись происходит в 1 DataNode, дальше идет репликация записанного блока на две другие DataNode’ы, как только репликация завершается клиенту возвращается подтверждение (ack packet) того что запись файла завершена успешно. Важно отметить, что клиенты пишут напрямую в DataNode’ы, минуя Namenode’у. Благодаря этому обеспечивается высокая надежность и пропускная способность.

Есть возможность либо удалить, либо записать файл заново, либо записать что-то в конец файла причем при записи в конец файла создается отдельный новый блок, в существующие блоки, даже если их размер не достигается предельного размера согласно настройкам HDFS, дозаписать что-то не получится. Аналогично и про запись в середину файла, такой возможности не предоставляется.

Источник: javatpoint.com

Высокая доступность
NameNode (standby) — это сервер, который необходим в случае если активная NameNode в кластере выведена из строя. В Journal Node пишутся все изменения, а NameNode (standby) читает эти изменения и становится управляющим NameNode кластера в случае потери активной NameNode.

Источник: doc.hcs.huawei.com

YARN

YARN (Yet Another Resource Negotiator) — модуль, отвечающий за управление ресурсами кластера и планирование заданий. YARN работает с Java контейнерами, похожая сущность что у Kubernetes и Mesos, но YARN больше заточен для экосистемы Hadoop. Основная идея YARN состоит в том, чтобы предоставить абстракцию управления ресурсами и запуск/мониторинг задач двум отдельным компонентам: глобальному менеджеру ресурсов Resource Manager и локальному серверному менеджеру Node Manager.

Resource Manager — планировщик ресурсов, абстрагирующий вычислительные ресурсы кластера и управляющий их предоставлением. Для выполнения заданий он оперирует контейнерами.

Resource Manager выполняет следующие задачи:
— принимает задание от Client
— смотрит на доступные ресурсы и выбирает сервер, на котором есть доступные ресурсы
— запускает на сервере Application Master для того, чтобы следить как выполняется задание, успешно ли оно выполняется
— передает задание Application Master
— выделяет по запросу Application Master контейнеры с ресурсами

Application Master — главный контейнер в YARN, который отвечает за координацию всех остальных контейнеров. Если выходит из строя Application Master, то все остальные контейнеры тоже отключаются. На Application Master запускается JAR файл приложения, переданный от клиента, дальше Application Master запрашивает у Resource Manager запуск контейнеров, Resource Manager опрашивает на каких Node Manager есть свободные ресурсы, определившись у каких серверов есть необходимые ресурсы Application Master отправляет запросы этим Node Manager для запуска контейнеров. Само приложение внутри контейнера может иметь свой планировщик ресурсов, который может запрашивать дополнительные ресурсы у Resource Manager.

Node Manager — процесс, работающий на каждом вычислительном узле и отвечающий за запуск контейнеров приложений, мониторинг использования ресурсов контейнера (процессор, память, диск, сеть) и передачу этих данных Resource Manager.

Разделение ресурсов
YARN может приоритезировать какому приложению в первую очередь выделить ресурсы. Настроить приоритезацию можно следующим образом:
— Отдельная очередь для долгих тяжелых задач
— Отдельная очередь для мелких ad-hoc запросов
— Отдельная очередь для обучения ML моделей
— Отдельная очередь на каждый отдел и т. д.

Источник: edureka.co

MapReduce

MapReduce — это фреймворк для обработки наборов данных с помощью параллельного распределенонго алгоритма.

Вычислительная модель MapReduce состоит из 3 этапов:
map — каждый узел кластера, хранящий данные, применяет к каждой записи некоторую функцию и выдает результат в формате ключ-значение
shuffle — перераспределение данных по сети по вычислительным узлам на основе ключей из этапа map
reduce — применение агрегирующей функции к сгруппированным данным благодаря этапу shuffle и расчет финального результата

Достоинства такого фреймворка:
— Можно обрабатывать огромные объемы данных
— Отказоустойчивость при обработке
— Data Locality — данные хранятся на тех же узлах, где происходят вычисления

Недостатки:
— Частые операции чтения и записи на диск
— Ограниченная область применения

Источник: todaysoftmag.com

Hive

Hive — пользовательский интерфейс, где написанный клиентом SQL запрос преобразуется в MapReduce job и запускается на Hadoop. Благодаря Hive можно выполнять запросы к слабоструктурированным данным, для запросов используется специальный язык HiveQL. У Hive нет своего хранилища, но он имеет отдельную БД Hive Metastore для хранения метаданных о таблицах (структур таблиц).

Как выглядит архитектура Hive, клиенты отправляют SQL запрос на выполнение, запрос принимает HiveServer2, который отвечает за взаимодействие с клиентами. После запрос парсится с помощью компилятора для проверок наличия такой таблицы в Hadoop, следующим этапом запрос передается оптимизатору, после оптимизации запроса результат оптимизатора передается на исполнение в виде MapReduce job. Помимо MapReduce Hive может выполняться поверх Apache Spark. К сожалению, Hive — это все же не полноценная замена классической СУБД, также он не про скорость выполнения запроса, а про всеядность обработки больших объемов данных с понятным для пользователей интерфейсом, так как запуск job в YARN с процессом его приоритизаций, выделения ресурсов не быстрое действие.

Источник: interviewbit.com

Apache Hive Beeline — это клиент Hive для запуска HiveQL запросов с помощью командной строки.

Таблица в структуре Hive
Таблица в Hive представляет из себя аналог таблицы в классической реляционной БД. Основное отличие — что данные Hive’овских таблиц хранятся просто в виде обычных файлов на HDFS. Это могут быть обычные текстовые csv-файлы, бинарные sequence-файлы, более сложные колоночные parquet-файлы и другие форматы. Но в любом случае данные, над которыми настроена Hive-таблица, очень легко прочитать и не из Hive

Таблицы в hive бывают двух видов:
— классическая таблица, куда данные добавляются при помощи HiveQL
— внешняя таблица, данные в которую загружаются внешними системами, без участия Hive. Для работы с  внешними таблицами при создании таблицы нужно указать ключевое слово EXTERNAL, а также указать путь до папки, по которому хранятся файлы

Партиции в Hive
Так как Hive представляет из себя движок для трансляции SQL-запросов в mapreduce-задачи, то обычно даже простейшие запросы к таблице приводят к полному сканированию данных в этой таблицы. Для того чтобы избежать полного сканирования данных по некоторым из колонок таблицы можно произвести партиционирование этой таблицы. Это означает, что данные относящиеся к разным значениям будут физически храниться в разных папках на HDFS. Партиции в Hive это не тоже самое что партиции в Oracle или другой реляционной СУБД, так как отдельные секционированные части хранятся в разных файлах на HDFS. Партиционирование в Hive является нативным функционалом и не требует создания подтаблиц как в случае с реляционными СУБД. Все партиции создаются автоматически и распределяются в каталогах HDFS.

MSCK REPAIR TABLE
У Hive нет контроля над хранилищем HDFS и если случилось так, что в папку таблицы были записаны данные отличным от Hive инструментом, например, Apache Spark, то Hive не увидит эти изменения. Чтобы избавиться от несогласованности данных в этой таблице необходимо выполнить команду MSCK REPAIR TABLE. Благодаря этой команде Hive проанализирует таблицу и новые партиции станут доступны в Hive.

Форматы хранения данных в Hadoop

Основными форматами хранения в Hadoop являются:
— Parquet
— ORC
— Avro

Parquet и ORC — это колоночный формат хранения данных, а Avro является построчным форматом. Одним из плюсов перечисленных форматов в том, что в них записана схема, то есть указаны какие поля есть в таблице, какого они типа. Помимо этого плюсом колоночного формата хранения является то, что каждый столбец таблицы — это отдельный файл в HDFS, что позволяет гораздо быстрее считывать данные по сравнению с построчным хранением данных. В случае если нам нужна высокая скорость записи широких по столбцам таблиц, то необходимо присмотреться на строковый формат хранения Avro.

При выборе между Parquet и ORC нужно понимать в чем их принципиальные отличия:
— Формат Parquet лучше хранит вложенные данные, например, колонки формата json
— У Parquet в метаданных колонок хранятся разные статистики по типу max, min, count, это может значительно сократить время подсчета агрегатов
— ORC поддерживает ACID-свойства
— ORC эффективнее сжимает данные чем Parquet

Реляционные и нереляционные БД

Источник: bigdataschool

Реляционные БД

SQL подход — это семейство реляционных баз данных, основанное на отношениях (связях) таблиц друг с другом. Каждая таблица представлена в виде столбцов и строк. Столбец имеет свой предопределенный тип данных, в каждой ячейке значение. Строка хранит набор связанных значений, относящихся к объекту. Для определения уникальности строки существуют уникальный идентификатор (primary key). Строки из нескольких таблиц могут быть связаны посредством внешних ключей (foreign key).

Отличительные черты реляционной модели:
— подходит для решения большинства существующих задач
— запись и чтение структурированных данных
— данные связаны в виде логических отношений таблиц
— в таблицах есть строки и столбцы, где каждый атрибут имеет свой тип данных, а в ячейке свое значение
— есть уникальный ключ (primary key) для определения уникальности записи
— есть внешний ключ (foreign key) для отношения (связи) строк одной таблицы с строками другой таблицы
— необходима фиксированная схема (schema), где описана структура таблицы (наименование полей, тип полей и т. п.) наложенные на таблицу ограничения (constraints, checks, excludes)
— благодаря поддержки ACID свойств обеспечивается целостное хранение и согласованность данных, высокая отказоустойчивость и надежность
— поддержка языка SQL для манипуляций с данными

Недостатки реляционных БД:
— сложно горизонтально масштабироваться
— невозможно хранить данные с заранее неизвестной структурой
— менее эффективны в обработке больших объемов данных (террабайты, петабайты), чем нереляционные БД

Популярными представителями реляционных СУБД являются: Oracle, MySQL, MSSQL, Postgres

Нереляционные БД (NotOnlySQL)

NoSQL подход — это семейство нереляционных баз данных, реализующее отличный от традиционной табличной модели представления данных, где управление данными осуществляется не только с помощью языка структурированных запросов SQL.

Отличительные черты нереляционной модели:
— используются для решения узкоспециализированных задач
— запись и чтение неструктурированных данных
— гибкость, отсутствует фиксированное описание схемы из-за хранения данных без строгой структуры в виде документов, ключ-значений, графов и т. д.
— взамен ACID принципов используются BASE принципы, которые основаны на CAP теореме
— возможность горизонтального масштабирования путем добавления нового сервера в кластер
— поддержка шардирования
— высокая доступность и отказоустойчивость благодаря репликации данных
— обработка больших объемов неструктированных данных с низкой временной задержкой
— поддержка собственных SQL-подобных языков запросов, RESTful-интерфейсов, API и сложных типов данных

Недостатки нереляционных БД:
— отсутствие сильной целостности данных приводит к случаям чтения неактуальной информации, реплика не всегда может успеть обновиться актуальными данными
— сильная привязка к специфике внутреннего языка запросов конкретной СУБД, когда в реляционной БД есть SQL, который универсален для всех реляционных баз. Это приводит к сложности перехода от одной нереляционной БД к другой

Виды NoSQL БД:
Ключ-значение (key-value)

Источник: cloud.yandex.ru

В БД данного типа записи хранятся в парах ключ-значение, где ключ — уникальный идентификатор. Key-value БД используются для систем, где очень важна скорость и данные представлены не в сложном виде. Такие БД хорошо подходят например, для хранения кэша данных, пользовательских сессий, корзин в интернет магазине.

Популярными представителями являются: Redis (Remote Dictionary Server), DynamoDB, Memcached

Колоночные (column family store)

Стандартная строковая СУБД. Источник: clickhouse.com
Столбцовая СУБД. Источник: clickhouse.com

В колоночных БД данные каждой колонки хранятся отдельно независимо друг от друга (для каждого столбца создается свой файл). Такой принцип позволяет считывать с диска только данные тех столбцов что указаны в запросе — это ускоряет процесс чтения данных из больших таблиц, предназначенных для аналитических целей. Также преимуществом такого хранения является возможность сильно сжимать данные, что значительно экономит место на диске. Недостатком является выполнение операций над строками, у такого типа БД они более затратные.

Популярными представителями являются: Cassandra, Apache Hbase, ClickHouse

Документоориентированные (document-oriented store)

Схема документоориентированного БД. Источник practicum.yandex.ru

Данные в этом типе БД хранятся в виде документов в формате JSON, YAML, XML. Документы складываются в коллекции, а коллекции группируются логически тем самым создается иерархия. Преимуществом такой БД является гибкость, значения и структура документов может меняться в процессе разработки. Такие БД часто применяются для каталогов товаров в маркетплейсах, в соцсетях, платформ с блогами и видео, геоаналитики.

Популярными представителями являются: MongoDB, Amazon DynamoDB, CouchDB

Бонус

Классное и креативное объяснение NoSQL простым языком:

Традиционная БД против MPP БД

Предисловие

В данной статье разберу чем традиционные БД отличаются от MPP, в каких задачах достаточно иметь традиционную, а в каких MPP значительно лучше.

Симметричная многопроцессорная архитектура SMP

Традиционные СУБД как Oracle, Postgres, MySQL, MSSQL используются в симметричной многопроцессорной архитектуре (SMP).
SMP архитектура — это share-everything архитектура, где несколько процессоров сервера одинаковой производительности совместно используют оперативную память, что позволяет быстро обмениваться данными между процессами, жесткие диски для хранения данных. Каждый процессор может решать разные задачи причем делает это независимо друг от друга.

SMP архитектура

Преимущества SMP архитектуры относительно СУБД:
— Хорошо масштабируется по вертикали, добавляются дополнительные ресурсы сервера (CPU, RAM, HDD), тем самым повышая скорость отработки запросов
— Один сервер легче администрировать и обслуживать (управлять правами доступа, делать резервное копирование, накатывать обновления для СУБД и т. д.)
— Высокая производительность на небольших объемах данных, так как данные хранятся в одном месте не нужно передавать их по сети
— Равномерное распределение нагрузки на процессоры сервера
— Отлично подходит для обработки постоянного потока (real time) небольших транзакций характерных для OLTP систем
— Благодаря грамотному использованию индексов достигается высокая скорость чтения данных
— Отказоустойчивость, выход из строя одного процессора не заблокирует работу всего сервера

Недостатки:
— Совместное использование, конкуренция за ресурсы сервера пользователями СУБД
— В вертикальном масштабировании можно упереться в потолок, где добавление новых компонент (CPU, RAM) не будет давать прирост серверу в производительности
— Отсутствие горизонтального масштабирования
— Медленная обработка аналитических-OLAP запросов

Традиционные СУБД с SMP архитектурой хорошо показывают себя в
OLTP-системах, где важно обрабатывать постоянный поток (real-time) небольших по размеру транзакций с бОльшей долей операций вставки. Поэтому традиционные СУБД используются в микросервисах, веб-сайтах, CRM/ERP системах, в банках при обработке платежных транзакций.

Массивно-параллельная архитектура MPP

Массивно-параллельная архитектура — это зачастую shared-nothing архитектура, где каждому серверу выделены свои процессоры, своя оперативная память, а иногда и жесткие диски. Для общения и передачи данных между серверами все сервера подключены в одну сеть. Помимо этого в MPP СУБД встроена автоматическая разбивка данных по серверам под названием sharding. Если говорить грубо, то MPP — это несколько серверов, которые параллельно трудятся для решения одной задачи. К распространенным MPP СУБД можно отнести следующие продукты: ClickHouse, Greenplum, Vertica, Teradata.

MPP архитектура

Преимущества MPP архитектуры относительно СУБД:
— Легкая и доступная горизонтальная масштабируемость за счет добавления новых серверов в кластер
— Быстрая обработка аналитических-OLAP запросов за счет шардирования и партицирования
— Шардирование — разделение объектов базы данных на разные сегменты. Благодаря шардированию осуществляются распределенные вычисления. Шардирование в комплексе с shared-nothing концепцией дают хороший буст в производительности. Шардирование происходит благодаря дистрибуции данных по ключу, при правильном выборе ключа дистрибуции данные распределяются по сегментам равномерно, что играет ключевую роль
— Партицирование — разделение больших таблиц на секции, влечет за собой повышение производительности запросов путем снижения объема сканируемых данных, читаем только нужные секции. Также облегчает обслуживание таблиц, например, проще и эффективнее удалять, перемещать секции чем всю таблицу целиком
— Идеально подходит для корпоративных хранилищ данных
— Повышенная отказоустойчивость, отсутствует единая точка отказа. При выводе из строя одного из серверов кластера, работа СУБД не прерывается
— Возможность работать с несколькими источниками, например, разными OLTP системами, другими хранилищами или озерами данных

Недостатки:
— Высокие требования к сети, объединяющая сервера в один кластер. Сеть должна стабильно работать и иметь высокоскоростное соединение для передачи данных
— Низкая производительность для OLTP нагрузок с постоянным потоком транзакций
— Возможность возникновения перекоса данных на серверах кластера из-за неверно подобранного ключа дистрибуции, то есть в одном сегменте потребуется обработать значительно больше данных чем в других — это приведет к низкой производительности и нехватки памяти

В MPP СУБД фокус идет на аналитику больших данных (терабайты, петабайты), а значит предназначена решать задачи OLAP нагрузки, например, для построения корпоративного хранилища данных с обеспечением пользователей регулярными отчетами (МСФО, РСБУ), предиктивной аналитикой, подготовки данных с целью визуализации через BI инструменты.

История создания трека Inferno

06.11.2023 года в составе EP на британском лейбле JOOF Recordings вышел наш совместный трек с Enlusion Inferno. Сегодня расскажу вам историю его создания.

Обложка трека с Beatport

The Beauty of the Past

Изначально трек не планировалось издавать как совместную работу. В сентябре 2022 года я закончил свой новый трек под кодовым названием The Beauty of the Past, где был такой трансовый вайб с несколькими дорожками прогрессив бэйс лайна, с похожей лид партией из моих прошлых успешных работ, например, The Shockwave, аранжировку трека можно легко узнать с моего Terminate, да и что тут скрывать, хотелось написать трек в стиле крутого Slam Duck, вот наглядные примеры:

Моя версия в целом мне очень нравилась особенно своей атмосферой, прогрессией и сочетанием драмсов с басс линией. И вот я решил скинуть данный трек Enlusion и Daniel Lesden, чтобы узнать их мнение о треке, может уже пора релизить думал я, но рано было радоваться 😄

Вот так звучала первая версия Inferno The Beauty of the Past:

И вот первые впечатления от коллег:

Daniel Lesden: Мне какого-то лида не хватает, основной темы. Сейчас это по сути филер

Enlusion: Трек звучит приглушённо, все ключевые партии играют фоном вместо того, чтобы быть на переднем плане. Думаю, что он оживёт, если ты пересведёшь его. Мне по идеям нравится трек

Многочисленные правки с моей стороны не приводили в восторг никого.

Enlusion: Стало лучше. До ямы хорошо нагнетается напряжение, но после ямы ожидаешь усиления. Этого не происходит, трек просто начинается ещё раз и потом заканчивается, теперь нужно усилить напор после ямы, дать более мощный заряд

В конечном итоге произошло это:

Enlusion: Короче я готов закончить этот трек

Трансформация The Beauty of the Past в Inferno

У Кирилла (так зовут Enlusion) всегда в треках получались мастерские прогрессив бэйс лайны, поэтому первым же делом он решил заменить этот элемент, послушайте такую версию:

Сразу заметно, что трек сильно подрос в качестве, он стал более полным и звучать громче, Кирилл отлично постарался со сведением трека, но все же мне не нравилась добавленная им лид партия.

Daniel Lesden: Мне кажется тут бас-линия нужна более качающая. Мб тек-хаусовая? Кажется что трек пытается быть драйвовым пик-таймовым, а такой отрывистый короткий бас больше к прогу подходит

Кирилл решил поэксперементировать и вписать тек-хаусовый движок:

Была и такая версия, Кирилл любит потрудиться 😁

В это время я решил попробовать сделать свою техно версию трека, основываясь на изначальный The Beauty of the Past, Кириллу очень понравилась отсюда бочка, но в итоге пришли к мнению, что не стоит допиливать.

Кирилл не останавливается и продолжает потеть над треком, здесь он отказывается от тек-хаусового движка и решает добавить наикрутейший риз и мид-перк басс, зацените:

Daniel Lesden: Открытых хэтов очень не хватает в оффбит. Кажется в первую половину ещё какой-то музыкальный элемент нужен. Может стаб или что-то такое простое, буквально пару нот. Эсид(?) лид в кульминации лишний. Он там ещё и прячется как-то странно как енот-воришка, будто боится выйти из тени. Ни туда, ни сюда

Кирилл добавляет оффбит хэтов, дополнительный плак и прячет под фильтр основную мелодию в кульминации, получилось офигительно! Эта версия и осталась финальной.

Итого

Создание треков — это длительный, кропотливый труд, который, к сожалению, мало кто замечает, так как все оценивают финальный результат и даже не догадываются что было за кулисами. Часто бывает так, что твой трек не удостаивается должного внимания среди куча другого музыкального контента и это очень обидно, ведь ты и твой напарник могут вкладывать десятки часов, чтобы добиться офигенного результата. Давайте в данном случае поддержим в первую очередь Кирилла он проделал титаническую работу и превратил трек в настоящий бэнгер 🔥 Под поддержкой я подразумеваю, чтобы вы послушали трек на любых из стриминговых платформ, поделились треком/моей статьей в соц сетях, чтобы трек услышали как можно больше людей, написали комментарий здесь, в личке о своих впечатлениях. Поддерживайте музыкантов прослушиваниями, вниманием, ведь это драйверы продолжать творить!

SoundCloud

YouTube

В скором времени здесь появятся ссылки на остальные стриминговые платформы. Как говорится stay tuned!

kolesa conf’23

Источник оригинала фото: Kolesa Group

07.10.2023 в Алмате состоялась ежегодная конференция Kolesa Conf’23. Kolesa Conf’23 — это масштабная конференция, объединяющая IT-сообщество Казахстана.

О Kolesa Group

Источник: Kolesa Group

Kolesa Group — казахстанская IT-компания, которая специализируется на создании сервисов по размещению частных и бизнес объявлений в сфере авто, недвижимости, товаров и услуг в Казахстане и Узбекистане.

Главными продуктами компании являются мобильное приложение kolesa.kz — торговая площадка для автомобилистов, сайт krisha.kz — размещение объявлений о недвижимости, market.kz — сайт бесплатных объявлений общей тематики. Продуктами компании ежемесячно пользуются 15 миллионов пользователей.

Компания была основана в 1996 году. Вначале своего пути компания занималась выпуском газетного издания «Колёса», где можно было разместить объявление о продаже своего авто с фотографиями и  техническими характеристиками.

Место проведения

Кинотеатр «Арман» на проспекте Достык. Источники: sovietarch.strelka, kino.kz, Kolesa Group

Кинотеатр «Арман» открылся для посетителей в 1968 году, здание было спроектировано в лучших традициях эпохи совесткого модернизма с барельефом на боковых фасадах, отражающий путь Казахстана. Внутри вас встречает закусочная и пространство для проведения развлекательных мероприятий. Сейчас кинотеатр насчитывает 4 кинозала два на первом этаже и два на втором.

Направления конференции

Количество направлений полностью совпадает с количеством залов кинотеатра «Арман».
WEB про Backend, Frontend, Security и QA
MANAGEMENT про запуск продукта, управление командой, изменение и управление процессами
MOBILE про iOS, Android разработку, QA, Security
DATA про ML, Product Analytics, Data Engineering

Больше всего докладов удалось послушать именно на направлении DATA. В качестве бонуса порекомендую по докладу с направления MANAGEMENT и MOBILE.

Многоступенчатое тестирование и зоопарк моделей в ClearML

Спикер: Андрей Шадриков, R&D Team Lead, Verigram
О докладе: Зачем делать много разных тестов ML-моделей и изменять их со временем? Этот подход помогает проверять модели на скрытые байесы, но добавляет проблемы в отслеживании зоопарка моделей. Как в Verigram решают проблему с отслеживанием зоопарка моделей, используя ClearML, как в связке с отчётами и GitLab снижается трение между командой разработки и бизнеса.
Основные тезисы доклада:
— В данных может быть много bias по разным причинам
— В ClearML можно разбивать тесты через параметры, можно искать по тегам, навешивать теги на модели, сохранять uncommited changes вместе с экспериментом
— ClearML проще расскатывается, чем MLFlow
— Если у вас уже готовая, настроенная инфраструктура, то переходить на ClearML не стоит
— ClearML — большой комбайн, может потреблять значительное кол-во ваших ресурсов

Эволюция подходов к персонализации в Авто.ру

Спикер: Вадим Кохтев, Руководитель ML-направления, Яндекс Вертикаль
О докладе: Путь Авто.ру в персональных рекомендациях, почему полезно строить платформенные решения и откуда у пользователя появилась особая роль
Основные тезисы доклада:
— Изначально на Авто.ру был атрибутивный поиск без рекомендаций и персонализации
— Решили внедрить бесконечную ленту как в VK, Instagram вместо главной страницы для повышения retention
— В рекомендациях начали показывать объявления из истории просмотренных, рекомендацию похожих
— Замешивание контента с помощью постов пользователей, статей про авто
— Персонализация с помощью определения намерений, что пользователь хочет делать на сервисе. В результате чего у посетителя появляется набор предсказаний и роль

Повышение качества данных с использованием Zero Bug Policy

Спикер: Олег Харатов, Technical unit lead, Авито
О докладе: Как системный подход по работе с проблемными данными помог в борьбе с ошибками в хранилище данных, какие метрики использовали для оценки этого подхода и как договаривались с владельцами данных
Основные тезисы доклада:
— В Avito Anchor modeling (6НФ), поэтому в детальном слое на проде крутятся несколько 10 тысяч таблиц
— Кол-во витрин растет очень быстро, а число ошибок растет еще быстрее
— Важно разделять витрины на несколько уровней важности: критичные, важные, стандартные, неважные
— Zero Bug Policy — это про то, что не стремишься решить все баги на проде, а лишь самые важные. Подход отвечает на вопросы какие, когда и кто
— Создали матрицу приоритетов в разрезе типа бага, важности витрин с SLA на исправление
— К большему числу багов необходимо подходить системно, трекать как быстро решаются проблемы с данными и оценивать метриками

Как правильно развивать продукт через исследования, поиск проблем и точек роста. Discovery в Kolesa Group

Спикер: Дмитрий Казаков, Директор по аналитике, Kolesa Group
О докладе: Как в Kolesa перешли к discovery-процессам, через какие сложности прошли и какие проблемы Discovery помогает решать. Доклад будет полезен командам, которые хотят поставить на поток поиск и внедрение фичей в продуктовых командах и которые сталкивались с неэффективностью в этих процессах.
Основные тезисы доклада:
— Discovery — поиск и работа с проблемами и точками роста
— Discovery нужно внедрять когда нехватка идей в продукте, есть искажения в принятии решений, есть фокус на project management, а не product management
— Double Diamond — основа Discovery в Kolesa. Спрашивайте у дизайнеров как строить продукт, они лучше понимают пользователей
— Команда Discovery состоит из Product Owner, аналитиков, дизайнеров, core команды сервиса, техлиды
— Систематизируйте хранение информации вокруг Discovery
— Инструменты Discovery должны быть такими, чтобы человеку со стороны было легко разобрать смысл написанного
— Выросло на 40% кол-во исследований и проверок идей в команде после внедрения Discovery
— Рост вовлеченности команды в продукт
— Начали работать от проблемы, а не от решений

Философия архитектуры

Спикер: Алексей Емелин, Руководитель группы разработки на Android, Yandex
О докладе: Основные трудозатраты программиста — это обдумывание кода, своего и чужого. А можно ли снизить эти трудозатраты? Как устроен процесс мышления? Есть ли методологии, помогающие лучше понять код? Поможет ли знание Канта и Гегеля глубже осознать логику MV* архитектуры и предположить, что будет после MVI? На эти и многие другие философские вопросы об архитектуре ПО Алексей постарался ответить в своем выступлении
Основные тезисы доклада:
— Цель архитектуры — уменьшить человеческие трудозатраты
— Понимание того как мы мыслим может приблизить к цели архитектуры
— Философия — наука о мышлении, дает методологию постижения истины
— Принцип историзма — при рассмотрении чего либо в окружающем мире нужно учитывать нормы того времени, то есть что было принято тогда, какие технологии были в то время, на какой стадии развития они находились
— «Все течет, все меняется», то есть все есть процесс. В процессе нет границ
— Противоречие или парадокс — двигатель прогресса по философии
— Рассматривая явления в движении в развитии, появляется возможность выявлять тенденции к изменениям и причина изменений строится в какой-то проблеме

Остальные активности и плюшки

При входе на конференцию нам раздавали welcome паки, где подарили следующие ништяки:

На втором этаже расположилась развлекательная часть конференции, было 7-8 стендов от самих Kolesa и их партнеров, где можно было побатлиться в игре по определению ЯП, пройти виртуальный квест, постоять на баланс-борде, поугадывать мемы на карте мемасов, выиграть призы, правильно ответив на IT-вопрос разной категории сложности и многое другое.

Источник: Kolesa Group

В конце основной части было выделено время на пообщаться за кружечкой прохладного, пофотографироваться и подготовиться отправиться на after-party в местном баре.

Источник: Kolesa Group

Полезные ссылки

Фото галерея конференции: Kolesa Group
Записи выступлений с направления DATA
Записи выступлений с направления MANAGEMENT
Записи выступлений с направления MOBILE
Записи выступлений с направления WEB

В качестве бонуса видеообзор конференции от команды Kolesa:

Python for Data Analysis Course

На днях презентовал внутри своей компании тренажер/курс по Python для анализа данных. Если вы когда-нибудь хотели начать программировать на Python или вам надоел Excel, и вы хотите попробовать что-то поинтереснее для аналитики или визуализации, то рекомендую начать с данного тренажера.

QR-Code на Github репо с тренажером

Для кого данный тренажер?

Тренажер отлично подойдет для:
— бизнес-экспертов, работающих с табличными данными
— дата-аналитиков
— людей, кто никогда не программировал на Python

Формат обучения

— Онлайн
— В своем темпе без каких-либо дедлайнов
— В среднем тренажер займет от 2 до 4 недель

Что нужно для тренажера?

— Доступ в Интернет
— Браузер Google Chrome (рекомендация)
— Аккаунт Google

Содержание тренажера

Тренажер состоит из 3-ех больших блоков, написанных на Jupyter Notebook:

  1. Введение в Python
  2. Основы Numpy и Pandas
  3. Визуализация данных

После прохождения тренажера вы будете уметь:

— Писать и читать простые программы на Python
— Загружать excel, csv в Jupyter, манипулировать данными с помощью библиотек pandas, numpy
— Визуализировать данные с помощью библиотек matplotlib и seaborn
— Сможете использовать Jupyter Notebook в своей повседневной работе

Обратная связь и помощь

В случае если вы хотите поделиться обратной связью, указать на ошибки, опечатки или у вас возник вопрос по теме тренажера, то можете связаться со мной здесь.

Ранее Ctrl + ↓