pyspark经验小结

记录spark使用过程中的一些常用点,不定期更新~

pyspark环境打包

先本地构建虚拟环境,打包后提交到集群,已conda为例

conda create -n py37 python=3.7 pip

source activate py37
pip install -r requirements.txt

# install conda pack
pip install conda-pack

# Pack environment my_env into out_name.tar.gz
conda pack -n py37 -o py37_env.tar.gz

打包完成后,可重现环境检查下

mkdir -p test_env
tar -xzf py37_env.tar.gz -C test_env
source py37_env/bin/activate
# 查看安装包是否一致
pip list

这里插一个,如果想看conda环境安装的包,路径为~conda_path/envs/py37/lib/python3.7/site-packages

采用spark-submit

/opt/tiger/spark_deploy/spark-stable/bin/spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --queue ${queue_name} \
    --conf spark.driver.memory=4g \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=10 \
    --conf spark.dynamicAllocation.initialExecutors=10 \
    --conf spark.dynamicAllocation.maxExecutors=20 \
    --conf spark.executor.memory=4g \
    --conf spark.executor.cores=4 \
    --name spark_test_env \
    --conf spark.hadoop.yarn.cluster.name=${cluster_name} \
    --conf spark.pyspark.python=./py37_env/bin/python3 \
    --conf spark.pyspark.driver.python=./py37_env/bin/python3 \
    --archives hdfs_path/py37_env.tar.gz#py37_env
    main.py

参考

工程打包

pyspark主程序依赖的文件比较少的时候,可以通过--py-files逐个添加,但当依赖的比较多,甚至是整个工程,有比较复杂的目录结构的时候,就需要打包了,打包成.zip,.egg等格式也可通过--py-files传入,这些文件会加入PYTHONPATH中。

这里需要注意的是,打成zip包的时候,必须是一级目录,这里给出一种正确的打包方式

cd project_home
zip -r ../project_home.zip .

常用参数

Spark Conf

{
  "spark.dynamicAllocation.minExecutors": "200",
  "spark.dynamicAllocation.initialExecutors": "200",
  "spark.dynamicAllocation.maxExecutors": "500",
  "spark.driver.memory": "2g",
  "spark.executor.memory": "4g",
  "spark.driver.cores": "4",
  "spark.yarn.executor.memoryOverhead": "4g",
  "spark.sql.adaptive.enabled": "true",
  "spark.sql.adaptive.skewedJoin.enabled": "true",
  "spark.sql.adaptive.skewedJoinWithAgg.enabled": "true",
  "spark.sql.adaptive.multipleSkewedJoin.enabled": "true"
}

其中倒数三个参数是Spark AE SkewedJoin优化的一般参数,应对数据偏斜(处理偏斜优先分析key的取值是否均匀,比如空值等异常值需要被提前先移除掉)。

依赖资源

资源大多通过以下几种方式引入

  • files: 逗号分隔的文件,这些文件放在每个executor的工作目录下面
  • py-files: 逗号分隔的”.zip”,”.egg”或者”.py”文件,这些文件放在python app的PYTHONPATH下面
  • jars: 逗号分隔的本地jar包,包含在driver和executor的classpath下
  • archives: 指定压缩文件地址,压缩文件被分发到 executor 上,并且解压,解压路径可通过#指定

对于以上几个的区别主要是是否会被加入classpathpythonpath,一般来说数据文件通过--files--archives来添加,代码文件通过--py-files来添加,后者能被直接import。

详情参见What’s the difference between —archives, —files, py-files in pyspark job arguments

跑模型

通过spark跑深度学习模型时(当然是cpu啦),要想充分利用高并发并且减少模型加载时间的影响,可以使用mapPartitions

先看官方给的例子

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]

传入和穿出都是迭代器,一条一条的计算。但是如果我们想一个batch一个batch的预测,显然有三个做法

  • 还是迭代器一条一条,攒一批过模型,再把结果迭代器传出
  • 利用处理流数据的dataset,如pytorch的IterableDataset
  • 直接把一个partition全部转成list来处理,需要注意是否会爆内存

分别给出前两种写法的例子

方式一:攒数据

def infer_batch_by_partition(partition):
    batch_size = 16
    count = 0
    row = partition
    data_batch = []

    while True:
        try:
            content = next(row)

            if count < batch_size:
                count += 1
                data_batch.append(content)

            if count >= batch_size:
                count = 0
                # 过模型
                predicts = model_predict(data_batch)
                for p in predicts:
                    yield p
                data_batch = []
        except StopIteration:
            # 最后一个batch,数据量不一定是batch_size
            if data_batch:
                predicts = model_predict(data_batch)
                for p in predicts:
                    yield p
                    
rdd_pair_pred = rdd_pair.repartition(2000).mapPartitions(infer_batch_by_partition)

方式二:IterableDataset

Iterabledataset 的一个例子

class MyIterableDataset(torch.utils.data.IterableDataset):
...     def __init__(self, your_args):
...         super(MyIterableDataset).__init__()
...         # this depends on your dataset, suppose your dataset contains 
...         # images whose path you save in this list
...         self.dataset = # something for load the path of images 
...
...     def __iter__(self):
...         for image_path in self.dataset:
...             sample, label = read_single_example(image_path) # read an individual sample and its label
...             yield sample, label
def seq_padding(seq, max_len, value=0):
    x = [value] * max_len
    x[:len(seq)] = seq[:max_len]
    return x

class DedupePairDatasetV2Iter(IterableDataset):

    def __init__(self, text_data_iter, tokenizer, max_len=128, max_sen=16, mode='test'):
        super(DedupePairDatasetV2Iter, self).__init__()
        self.max_len = max_len
        self.max_sen = max_sen
        self.tokenizer = tokenizer
        self.text_data_iter = text_data_iter

    def gen_doc_input(self, doc):
        sentences = doc['summary']
        sentences = sentences[:self.max_sen]

        # 构造每个句子的bert输入
        input_ids_list, token_type_ids_list, attention_mask_list = [], [], []
        for sen in sentences:
            token_type_ids = [0] * self.max_len
            input_ids, attention_mask = [], []
            for ch in sen[:self.max_len - 2]:
                token_idx = self.tokenizer.encode(ch)
                if len(token_idx) != 3:
                    # 空格等,转为[UNK]
                    input_ids.append(100)
                else:
                    input_ids.append(token_idx[1])
                attention_mask.append(1)
            input_ids = [101] + input_ids + [102]
            attention_mask = [1] + attention_mask + [1]

            input_ids_list.append(seq_padding(input_ids, self.max_len))
            token_type_ids_list.append(seq_padding(token_type_ids, self.max_len))
            attention_mask_list.append(seq_padding(attention_mask, self.max_len))
        # pad doc sentence
        for i in range(len(input_ids_list), self.max_sen):
            input_ids_list.append([101] + [0] * (self.max_len - 2) + [102])
            token_type_ids_list.append([0] * self.max_len)
            attention_mask_list.append([1] * self.max_len)

        # 增加句子长度,每一个字段分开batch
        n_sen = min(len(sentences), self.max_sen)
        sen_mask = [1] * n_sen + [0] * (self.max_sen - n_sen)
        inputs = {
            'input_ids': torch.tensor(input_ids_list, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids_list, dtype=torch.long),
            'attention_mask': torch.tensor(attention_mask_list, dtype=torch.long),
            'sen_mask': torch.tensor(sen_mask, dtype=torch.long)
        }

        return inputs

    def __iter__(self):
        for item in self.text_data_iter:
            left = self.gen_doc_input(item['left'])
            right = self.gen_doc_input(item['right'])
            yield left, right, json.dumps(item, ensure_ascii=False)



def infer_duplicate_pair_iter(model, 
                              tokenizer, 
                              pair_list, 
                              batch_size=16, 
                              max_sen=16, 
                              max_seq_len=128,
                              sent_hidden_size=64):
    device = torch.device('cpu')

    # test_set = DedupePairDatasetV2(pair_list, tokenizer, mode='test', max_sen=max_sen, max_len=max_seq_len)
    test_set = DedupePairDatasetV2Iter(pair_list, tokenizer, mode='test', max_sen=max_sen, max_len=max_seq_len)
    test_params = {'batch_size': batch_size,
                   'shuffle': False,
                   'num_workers': 0,
                   }
    testing_loader = DataLoader(test_set, **test_params)

    model.eval()
    with torch.no_grad():
        for _, inputs in enumerate(testing_loader, 0):
            left, right, items = inputs
            for k in left:
                left[k] = left[k].to(device, dtype=torch.long)
            for k in right:
                right[k] = right[k].to(device, dtype=torch.long)
            logits = model(left, right)
            logits = logits.cpu().detach().numpy()
            score = softmax(logits, axis=1).tolist()
            for pair, pred in zip(items, score):
                pair = json.loads(pair)
                pair['left'].pop('summary', '')
                pair['right'].pop('summary', '')
                pair['score'] = pred[1]
                yield json.dumps(pair, ensure_ascii=False)


def infer_batch_by_partition(partition):
    return infer_duplicate_pair_iter(model_path, tokenizer, partition, batch_size=8)
rdd_pair_pred = rdd_pair.repartition(2000).mapPartitions(infer_batch_by_partition)

参考:

基本操作

读取json

from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("readJSON").getOrCreate()

readJSONDF = spark.read.json('Simple.json')
readJSONDF.show(truncate=False)

参考:用pyspark读取json数据的方法汇总

读取csv

df = spark.read.csv(hdfs_path, header=True, inferSchema="true")
df = df.select('doc_id', 'title')
df = df.withColumnRenamed('title', 'doc_title')
df = df.withColumn('doc_id', df['doc_id'].cast(StringType()))

写入hive分区

# 指定分区进行重写,而不会重写整张表
df.createOrReplaceTempView('tmp_push')
query = 'INSERT OVERWRITE TABLE aaa.bbb PARTITION(date="${date}") SELECT * FROM tmp_push'
spark.sql(query)

多个rdd的联合

rdd_list.append(rdd1)
rdd_list.append(rdd2)
rdd_union = sc.union(rdd_list)

join

能用dataframe尽量用dataframe,有性能优化

left_anti只有dataframe有接口

df_a_not_in_b = df_a.join(df_b, on=['key'], how='left_anti')

udf

def make_pair_key(id1, id2):
  id1, id2 = str(id1), str(id2)
  if id1 > id2:
    id1, id2 = id2, id1
  return id1 + '-' + id2

udf_make_pair_key = F.udf(make_pair_key, StringType())
df = df.withColumn('key', udf_make_pair_key(df_hist.id1, df_hist.id2))

骚操作

运行过程中安装依赖包

有一些包的安装比较麻烦,比如spacy的模型zh_core_web_md,正常的安装方式为

pip install -U pip setuptools wheel
pip install -U spacy
python -m spacy download zh_core_web_md

最后一步我们没有办法写到requirements中,一个骚操作是把模型文件上传,然后在代码中解压,亲测有效

def load_spacy_model(env='local'):
    import zipfile

    def unzip_model(zip_file, out_path='.'):
        zFile = zipfile.ZipFile(zip_file, "r")
        for fileM in zFile.namelist():
            zFile.extract(fileM, out_path)
        zFile.close()

   
  	zip_file = 'zh_core_web_md-3.0.0.zip'
  	out_path = '.'

    unzip_model(zip_file, out_path)
    NLP = spacy.load(out_path + '/zh_core_web_md-3.0.0')

但其实并不需要这么麻烦,更好的做法是直接加载模型

from pyspark.sql import SparkSession
import os
import sys
import zipfile

spark = SparkSession.builder.enableHiveSupport().getOrCreate()


def test_load_model():
    sys.path.insert(0, 'zh_core_web_md-3.0.0.zip')  # 通过py-files引入,作为一个路径

    files = os.listdir('./')
    print('x'*60, sys.stderr)
    print('\n'.join(files), sys.stderr)

    import spacy
    nlp = spacy.load('zh_core_web_md-3.0.0.zip/zh_core_web_md-3.0.0')
    text = '风寒三友即岁寒三友,指松、竹、梅三种植物。'
    doc = nlp(text)
    for token in doc:
        # info = [token.text, token.lemma_, token.pos_, token.tag_, token.dep_,
        #             token.shape_, token.is_alpha, token.is_stop]
        print(token.text, file=sys.stdout)

zh_core_web_md-3.0.0的文件内容为:

accuracy.json
attribute_ruler
config.cfg
meta.json
ner
parser
senter
tagger
tok2vec
tokenizer
vocab