S'S ALGORITHM

Spark

graph TD
    A[用户应用程序] --> B[驱动程序]
    B --> C[SparkSession/Context]
    C --> D[集群管理器ClusterManager]
    D -->|分配资源| E[集群中的工作节点]
    E -->|启动| F[任务调度器schedulers]
    F -->|处理任务| G[执行器ExecutorCacheTasks]
    G -->|生成| H[RDDs/Datasets]
    H -->|执行操作| I[变换和行动]
    I -->|保存结果| J[存储系统]

    %% 添加额外的细节
    D -->|管理| K[集群资源]
    E -->|并行处理| L[任务]
    L -->|结果返回| B

Hadoop 和 Spark

是两种用于处理大数据的分布式计算框架。它们各自有不同的设计目标、架构和应用场景。下面是它们的主要区别:

  1. 核心组件和架构
  1. 计算模型
  1. 性能和效率
  1. 易用性
  1. 应用场景
  1. 生态系统和工具
  1. 兼容性和部署
  1. 总结
特性 Hadoop Spark
存储系统 HDFS 多种选择(HDFS、S3、Cassandra、HBase 等)
计算模式 批处理 内存计算、批处理、流处理、交互式查询等
编程模型 MapReduce RDDs、DataFrames、Datasets
性能 磁盘 I/O 频繁,较慢 内存中计算,快速
易用性 较复杂,代码量大 高级 API,支持多语言,易用性强
应用场景 离线批处理,日志分析 实时分析,机器学习,流处理
生态系统 Pig、Hive、HBase、ZooKeeper Spark SQL、MLlib、GraphX、Spark Streaming
兼容性 YARN YARN、Mesos、Kubernetes

Spark 通常被视为 Hadoop 的补充,而非完全替代。根据具体的需求和应用场景,选择合适的框架可以带来更好的性能和效率。

Databricks

创建简单的ETL Pipeline

关键词

必读doc

官方文档GCP版

Photon提升运算性能 UnityCatalog整合workspaces ControlPlane & DataPlane Classic DataPlane Serverless DataPlane Three-level namespaces Metastore

delta table 和 delta live table

在Databricks中,Delta Table和Delta Live Table是两种用于管理数据的表类型,但它们的用途和功能有所不同。以下是它们之间的主要区别:

1. Delta Table:

2. Delta Live Table (DLT):,像是MV或者流数据表!!!

总结

根据具体需求,可以选择使用Delta Table进行灵活的数据管理,或使用Delta Live Table来简化和自动化数据流水线。

PySpark

在 Spark 的机器学习库 MLlib 中,Local VectorsLabeledPoint 是用于表示和处理机器学习数据的两种基本数据结构。

1. Local Vectors

Local Vectors 是 MLlib 中用于表示数据点特征的向量。每个向量包含一组特征值,可以用来表示数据集中的一个实例。Spark 中有三种类型的 Local Vectors

2. LabeledPoint

LabeledPoint 是 MLlib 中表示带标签的特征向量的一种数据结构,通常用于监督学习(例如分类和回归任务)。一个 LabeledPoint 包含两个部分:

假设你有一个分类问题,每个数据点由三个特征表示,并且有一个二元分类标签(0 或 1)。可以使用 LabeledPoint 来表示这样的数据点:

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

# 创建一个密集向量
features = Vectors.dense([1.0, 0.5, 3.0])

# 创建一个标签为1的 LabeledPoint
labeled_point = LabeledPoint(1.0, features)

# labeled_point = (label=1.0, features=[1.0, 0.5, 3.0])

在 Spark 的监督学习任务中,数据集通常由许多 LabeledPoint 组成,每个 LabeledPoint 代表一个训练样本,包含其特征和对应的标签。

通过使用这些数据结构,MLlib 可以有效地处理和训练大规模机器学习模型。

3. Pipeline是什么

在 PySpark 中,Pipeline 是用于构建和管理机器学习工作流的高层次 API。它帮助将数据预处理、特征提取、模型训练等步骤串联起来,以简化机器学习任务的执行和管理。

Pipeline 是一个由多个阶段(Stages)组成的有序序列,每个阶段可以是一个数据转换器(Transformer)或一个估计器(Estimator)。Pipeline 可以被看作是一个数据处理的流水线,数据按顺序通过各个阶段,逐步被转换、处理和预测。

4. Pipeline 的基本工作流程

  1. 创建数据预处理和模型对象:首先创建一系列 transformers 和 estimators,对应机器学习任务的各个步骤。
  2. 将步骤组合成 Pipeline:使用 Pipeline 类将所有步骤组合成一个管道。
  3. 拟合 Pipeline:使用训练数据拟合整个 pipeline,pipeline 会按顺序执行各个步骤,并生成一个训练好的模型。
  4. 预测或转换:使用拟合后的 pipeline 进行预测或数据转换。

5. Pipeline 示例

以下是一个简单的示例,用于构建和使用 PySpark 的 Pipeline 来处理数据并训练一个机器学习模型。

我们有一个包含特征的 DataFrame,并且目标是预测某个分类标签。

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# 创建 SparkSession
spark = SparkSession.builder.appName("PipelineExample").getOrCreate()

# 假设我们有一个包含特征的数据集
data = spark.createDataFrame([
    (0, "a", 1.0, 10.0),
    (1, "b", 2.0, 20.0),
    (2, "a", 3.0, 30.0),
    (3, "b", 4.0, 40.0)
], ["id", "category", "feature1", "feature2"])

# 步骤1: 将类别特征转换为数值
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

# 步骤2: 将所有特征列组合成一个向量列
assembler = VectorAssembler(inputCols=["categoryIndex", "feature1", "feature2"], outputCol="features")

# 步骤3: 使用逻辑回归进行分类
lr = LogisticRegression(featuresCol="features", labelCol="id")

# 将所有步骤组合到 Pipeline 中
pipeline = Pipeline(stages=[indexer, assembler, lr])

# 拟合 pipeline
model = pipeline.fit(data)

# 使用 pipeline 对新数据进行预测
predictions = model.transform(data)

# 显示结果
predictions.select("id", "features", "prediction").show()

代码解释

6. Pipeline 的优势

使用 PySpark 的 Pipeline,可以让你更有效地管理和执行复杂的机器学习工作流。

7. Feature Extraction&Transformation技术

在 PySpark 中,Feature Extraction 是从原始数据中提取有用的特征以供机器学习模型使用的过程。PySpark 提供了多种工具和技术来实现特征提取,主要针对文本数据、数值数据、类别数据等不同类型的输入数据。以下是一些常见的 Feature Extraction 技术及其使用方法。

1. Tokenizer 和 RegexTokenizer

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("TokenizerExample").getOrCreate()

# 示例数据
data = spark.createDataFrame([
    (0, "Hello, how are you?"),
    (1, "I'm doing fine, thank you!")
], ["id", "sentence"])

# 使用 Tokenizer
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
words_data = tokenizer.transform(data)

# 使用 RegexTokenizer
regex_tokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
regex_words_data = regex_tokenizer.transform(data)

words_data.show(truncate=False)
regex_words_data.show(truncate=False)

2. CountVectorizer

from pyspark.ml.feature import CountVectorizer

# 示例数据
data = spark.createDataFrame([
    (0, ["spark", "is", "great"]),
    (1, ["pyspark", "is", "awesome"]),
    (2, ["spark", "and", "pyspark", "are", "powerful"])
], ["id", "words"])

# 使用 CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=5, minDF=1.0)
model = cv.fit(data)
vectorized_data = model.transform(data)

vectorized_data.show(truncate=False)

3. TF-IDF (Term Frequency-Inverse Document Frequency)

from pyspark.ml.feature import HashingTF, IDF

# 假设 words_data 是之前使用 Tokenizer 生成的 DataFrame
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashingTF.transform(words_data)

# 计算TF-IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

tfidf_data.select("id", "features").show(truncate=False)

4. Word2Vec

from pyspark.ml.feature import Word2Vec

# 使用 Word2Vec
word2Vec = Word2Vec(vectorSize=3, inputCol="words", outputCol="features")
model = word2Vec.fit(data)
word2vec_data = model.transform(data)

word2vec_data.show(truncate=False)

5. PCA (Principal Component Analysis)

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

# 示例数据
data = spark.createDataFrame([
    (0, Vectors.dense([1.0, 2.0, 3.0, 4.0, 5.0])),
    (1, Vectors.dense([6.0, 7.0, 8.0, 9.0, 10.0]))
], ["id", "features"])

# 使用 PCA
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(data)
pca_result = model.transform(data)

pca_result.show(truncate=False)

6. StringIndexer

from pyspark.ml.feature import StringIndexer

# 示例数据
data = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "b"),
    (5, "c")
], ["id", "category"])

# 使用 StringIndexer
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed_data = indexer.fit(data).transform(data)

indexed_data.show(truncate=False)

7. OneHotEncoder

from pyspark.ml.feature import OneHotEncoder

# 假设 indexed_data 是使用 StringIndexer 生成的数据
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded_data = encoder.fit(indexed_data).transform(indexed_data)

encoded_data.show(truncate=False)

8. VectorAssembler

from pyspark.ml.feature import VectorAssembler

# 示例数据
data = spark.createDataFrame([
    (0, 1.0, 0.1, 3.0),
    (1, 2.0, 0.2, 6.0),
    (2, 3.0, 0.3, 9.0)
], ["id", "feature1", "feature2", "feature3"])

# 使用 VectorAssembler
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
assembled_data = assembler.transform(data)

assembled_data.show(truncate=False)

PySpark 提供了多种特征提取工具来处理不同类型的数据,包括文本、数值和类别数据。这些工具可以帮助你将原始数据转化为适合机器学习模型的数值向量,从而提高模型的表现力和准确性。通过结合使用这些特征提取技术,你可以构建更复杂和有效的机器学习管道。

8. Improve Model

可能改进我们代码的模型评估指标的方法:

特征工程

超参数调整

集成方法

文本矢量化方法

平衡数据集

先进的评估技术

增加数据

错误分析

模型选择

正则化强度

9. Performance Optimization

当然!在PySpark中,提升性能的关键点包括分区优化、广播变量使用和DataFrame操作优化。下面是每个方面的详细讲解:

1. 分区(Partitioning)

2. 广播变量(Broadcast Variables)

3. DataFrame操作优化(Optimizing DataFrame Operations)

(累加器)是一种用于在任务之间汇总信息的工具,主要用于在分布式计算过程中进行统计或计数。累加器的常见用途包括统计错误、计数操作以及汇总中间结果等。

Accumulator的特点

  1. 只读变量:累加器在任务执行过程中只会增加,不会减少。它们只能由驱动程序读取,但不能由任务进行修改。这保证了累加器的值在分布式计算中的一致性和准确性。

  2. 跨任务共享:累加器的值在所有工作节点间共享。任务可以对累加器进行累加,但不能进行其他类型的操作。

  3. 支持不同的数据类型:PySpark提供了对long类型的累加器支持,同时自定义累加器可以用于复杂的数据类型(如集合、列表等)。

如何使用Accumulator

以下是使用PySpark累加器的基本步骤:

  1. 创建累加器

在Spark中,累加器通过SparkContext创建。Spark内置支持LongAccumulatorDoubleAccumulator,也可以自定义累加器。

from pyspark import SparkContext

sc = SparkContext(appName="AccumulatorExample")

# 创建一个Long累加器
accumulator = sc.accumulator(0)
  1. 在任务中使用累加器

在RDD的转换操作(如mapforeach)中使用累加器。例如,可以在foreach操作中增加累加器的值。

def process_partitions(x):
    global accumulator
    accumulator.add(1)

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(process_partitions)

print("Accumulated value: ", accumulator.value)
  1. 使用自定义累加器

自定义累加器需要继承AccumulatorParam类,定义累加器的行为。

from pyspark import AccumulatorParam

class ListAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return []

    def addInPlace(self, acc1, acc2):
        return acc1 + acc2

list_accumulator = sc.accumulator([], ListAccumulatorParam())

def add_to_list(x):
    list_accumulator.add([x])

rdd = sc.parallelize([1, 2, 3])
rdd.foreach(add_to_list)

print("Accumulated list: ", list_accumulator.value)

使用累加器可以帮助在分布式计算中进行简单的汇总和统计,但需要注意其局限性和在任务执行中的行为。

通过以上方法,可以显著提高PySpark作业的性能,减少计算时间和资源消耗。根据具体的工作负载和数据特性,选择合适的优化策略。

概念总结

Apache Spark、PySpark 和 Databricks 之间存在紧密的关系,它们都是用于处理大规模数据的技术工具,但它们在生态系统中的角色和功能有所不同。

1. Apache Spark

Apache Spark 是一个开源的分布式计算引擎,设计用于快速处理大规模数据集。它支持批处理、流处理、SQL查询、机器学习和图计算等多种任务。Spark 可以在多种计算资源上运行,包括独立集群、Hadoop YARN、Apache Mesos 以及 Kubernetes。

Spark 的主要特点包括:

2. PySpark

PySpark 是 Spark 的 Python API,它允许开发者使用 Python 编写 Spark 程序。PySpark 提供了与 Spark 原生 Scala API 类似的功能,使得 Python 用户可以利用 Spark 强大的分布式计算能力。

PySpark 的主要特点包括:

3. Databricks

Databricks 是一个基于云的企业级数据平台,最初由 Apache Spark 的创始团队创建。它在云环境中托管 Spark,并提供了一个易于使用的集成开发环境(IDE),结合了大规模数据处理、机器学习和数据分析的功能。Databricks 提供了丰富的工具和集成,帮助企业在云上快速部署和管理 Spark 应用程序。

Databricks 的主要特点包括:

4. 共通使用的包

以下是 Spark、PySpark 和 Databricks 中常用的一些库和工具包:

通过这些工具和包,Spark、PySpark 和 Databricks 为处理大规模数据提供了一个强大而灵活的生态系统,帮助用户快速开发、测试和部署数据驱动的应用程序。