pyspark经验小结
记录spark使用过程中的一些常用点,不定期更新~
pyspark环境打包
先本地构建虚拟环境,打包后提交到集群,已conda为例
conda create -n py37 python=3.7 pip
source activate py37
pip install -r requirements.txt
# install conda pack
pip install conda-pack
# Pack environment my_env into out_name.tar.gz
conda pack -n py37 -o py37_env.tar.gz
打包完成后,可重现环境检查下
mkdir -p test_env
tar -xzf py37_env.tar.gz -C test_env
source py37_env/bin/activate
# 查看安装包是否一致
pip list
这里插一个,如果想看conda环境安装的包,路径为~conda_path/envs/py37/lib/python3.7/site-packages
采用spark-submit
/opt/tiger/spark_deploy/spark-stable/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${queue_name} \
--conf spark.driver.memory=4g \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=10 \
--conf spark.dynamicAllocation.initialExecutors=10 \
--conf spark.dynamicAllocation.maxExecutors=20 \
--conf spark.executor.memory=4g \
--conf spark.executor.cores=4 \
--name spark_test_env \
--conf spark.hadoop.yarn.cluster.name=${cluster_name} \
--conf spark.pyspark.python=./py37_env/bin/python3 \
--conf spark.pyspark.driver.python=./py37_env/bin/python3 \
--archives hdfs_path/py37_env.tar.gz#py37_env
main.py
参考
工程打包
pyspark主程序依赖的文件比较少的时候,可以通过--py-files
逐个添加,但当依赖的比较多,甚至是整个工程,有比较复杂的目录结构的时候,就需要打包了,打包成.zip,.egg等格式也可通过--py-files
传入,这些文件会加入PYTHONPATH中。
这里需要注意的是,打成zip包的时候,必须是一级目录,这里给出一种正确的打包方式
cd project_home
zip -r ../project_home.zip .
常用参数
Spark Conf
{
"spark.dynamicAllocation.minExecutors": "200",
"spark.dynamicAllocation.initialExecutors": "200",
"spark.dynamicAllocation.maxExecutors": "500",
"spark.driver.memory": "2g",
"spark.executor.memory": "4g",
"spark.driver.cores": "4",
"spark.yarn.executor.memoryOverhead": "4g",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.skewedJoin.enabled": "true",
"spark.sql.adaptive.skewedJoinWithAgg.enabled": "true",
"spark.sql.adaptive.multipleSkewedJoin.enabled": "true"
}
其中倒数三个参数是Spark AE SkewedJoin优化的一般参数,应对数据偏斜(处理偏斜优先分析key的取值是否均匀,比如空值等异常值需要被提前先移除掉)。
依赖资源
资源大多通过以下几种方式引入
- files: 逗号分隔的文件,这些文件放在每个executor的工作目录下面
- py-files: 逗号分隔的”.zip”,”.egg”或者”.py”文件,这些文件放在python app的
PYTHONPATH
下面 - jars: 逗号分隔的本地jar包,包含在driver和executor的classpath下
- archives: 指定压缩文件地址,压缩文件被分发到 executor 上,并且解压,解压路径可通过
#
指定
对于以上几个的区别主要是是否会被加入classpath
和pythonpath
,一般来说数据文件通过--files
和--archives
来添加,代码文件通过--py-files
来添加,后者能被直接import。
详情参见What’s the difference between —archives, —files, py-files in pyspark job arguments
跑模型
通过spark跑深度学习模型时(当然是cpu啦),要想充分利用高并发并且减少模型加载时间的影响,可以使用mapPartitions
。
先看官方给的例子
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
传入和穿出都是迭代器,一条一条的计算。但是如果我们想一个batch一个batch的预测,显然有三个做法
- 还是迭代器一条一条,攒一批过模型,再把结果迭代器传出
- 利用处理流数据的dataset,如pytorch的
IterableDataset
- 直接把一个partition全部转成list来处理,需要注意是否会爆内存
分别给出前两种写法的例子
方式一:攒数据
def infer_batch_by_partition(partition):
batch_size = 16
count = 0
row = partition
data_batch = []
while True:
try:
content = next(row)
if count < batch_size:
count += 1
data_batch.append(content)
if count >= batch_size:
count = 0
# 过模型
predicts = model_predict(data_batch)
for p in predicts:
yield p
data_batch = []
except StopIteration:
# 最后一个batch,数据量不一定是batch_size
if data_batch:
predicts = model_predict(data_batch)
for p in predicts:
yield p
rdd_pair_pred = rdd_pair.repartition(2000).mapPartitions(infer_batch_by_partition)
方式二:IterableDataset
Iterabledataset 的一个例子
class MyIterableDataset(torch.utils.data.IterableDataset):
... def __init__(self, your_args):
... super(MyIterableDataset).__init__()
... # this depends on your dataset, suppose your dataset contains
... # images whose path you save in this list
... self.dataset = # something for load the path of images
...
... def __iter__(self):
... for image_path in self.dataset:
... sample, label = read_single_example(image_path) # read an individual sample and its label
... yield sample, label
def seq_padding(seq, max_len, value=0):
x = [value] * max_len
x[:len(seq)] = seq[:max_len]
return x
class DedupePairDatasetV2Iter(IterableDataset):
def __init__(self, text_data_iter, tokenizer, max_len=128, max_sen=16, mode='test'):
super(DedupePairDatasetV2Iter, self).__init__()
self.max_len = max_len
self.max_sen = max_sen
self.tokenizer = tokenizer
self.text_data_iter = text_data_iter
def gen_doc_input(self, doc):
sentences = doc['summary']
sentences = sentences[:self.max_sen]
# 构造每个句子的bert输入
input_ids_list, token_type_ids_list, attention_mask_list = [], [], []
for sen in sentences:
token_type_ids = [0] * self.max_len
input_ids, attention_mask = [], []
for ch in sen[:self.max_len - 2]:
token_idx = self.tokenizer.encode(ch)
if len(token_idx) != 3:
# 空格等,转为[UNK]
input_ids.append(100)
else:
input_ids.append(token_idx[1])
attention_mask.append(1)
input_ids = [101] + input_ids + [102]
attention_mask = [1] + attention_mask + [1]
input_ids_list.append(seq_padding(input_ids, self.max_len))
token_type_ids_list.append(seq_padding(token_type_ids, self.max_len))
attention_mask_list.append(seq_padding(attention_mask, self.max_len))
# pad doc sentence
for i in range(len(input_ids_list), self.max_sen):
input_ids_list.append([101] + [0] * (self.max_len - 2) + [102])
token_type_ids_list.append([0] * self.max_len)
attention_mask_list.append([1] * self.max_len)
# 增加句子长度,每一个字段分开batch
n_sen = min(len(sentences), self.max_sen)
sen_mask = [1] * n_sen + [0] * (self.max_sen - n_sen)
inputs = {
'input_ids': torch.tensor(input_ids_list, dtype=torch.long),
'token_type_ids': torch.tensor(token_type_ids_list, dtype=torch.long),
'attention_mask': torch.tensor(attention_mask_list, dtype=torch.long),
'sen_mask': torch.tensor(sen_mask, dtype=torch.long)
}
return inputs
def __iter__(self):
for item in self.text_data_iter:
left = self.gen_doc_input(item['left'])
right = self.gen_doc_input(item['right'])
yield left, right, json.dumps(item, ensure_ascii=False)
def infer_duplicate_pair_iter(model,
tokenizer,
pair_list,
batch_size=16,
max_sen=16,
max_seq_len=128,
sent_hidden_size=64):
device = torch.device('cpu')
# test_set = DedupePairDatasetV2(pair_list, tokenizer, mode='test', max_sen=max_sen, max_len=max_seq_len)
test_set = DedupePairDatasetV2Iter(pair_list, tokenizer, mode='test', max_sen=max_sen, max_len=max_seq_len)
test_params = {'batch_size': batch_size,
'shuffle': False,
'num_workers': 0,
}
testing_loader = DataLoader(test_set, **test_params)
model.eval()
with torch.no_grad():
for _, inputs in enumerate(testing_loader, 0):
left, right, items = inputs
for k in left:
left[k] = left[k].to(device, dtype=torch.long)
for k in right:
right[k] = right[k].to(device, dtype=torch.long)
logits = model(left, right)
logits = logits.cpu().detach().numpy()
score = softmax(logits, axis=1).tolist()
for pair, pred in zip(items, score):
pair = json.loads(pair)
pair['left'].pop('summary', '')
pair['right'].pop('summary', '')
pair['score'] = pred[1]
yield json.dumps(pair, ensure_ascii=False)
def infer_batch_by_partition(partition):
return infer_duplicate_pair_iter(model_path, tokenizer, partition, batch_size=8)
rdd_pair_pred = rdd_pair.repartition(2000).mapPartitions(infer_batch_by_partition)
参考:
基本操作
读取json
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("readJSON").getOrCreate()
readJSONDF = spark.read.json('Simple.json')
readJSONDF.show(truncate=False)
读取csv
df = spark.read.csv(hdfs_path, header=True, inferSchema="true")
df = df.select('doc_id', 'title')
df = df.withColumnRenamed('title', 'doc_title')
df = df.withColumn('doc_id', df['doc_id'].cast(StringType()))
写入hive分区
# 指定分区进行重写,而不会重写整张表
df.createOrReplaceTempView('tmp_push')
query = 'INSERT OVERWRITE TABLE aaa.bbb PARTITION(date="${date}") SELECT * FROM tmp_push'
spark.sql(query)
多个rdd的联合
rdd_list.append(rdd1)
rdd_list.append(rdd2)
rdd_union = sc.union(rdd_list)
join
能用dataframe尽量用dataframe,有性能优化
left_anti
只有dataframe有接口
df_a_not_in_b = df_a.join(df_b, on=['key'], how='left_anti')
udf
def make_pair_key(id1, id2):
id1, id2 = str(id1), str(id2)
if id1 > id2:
id1, id2 = id2, id1
return id1 + '-' + id2
udf_make_pair_key = F.udf(make_pair_key, StringType())
df = df.withColumn('key', udf_make_pair_key(df_hist.id1, df_hist.id2))
骚操作
运行过程中安装依赖包
有一些包的安装比较麻烦,比如spacy的模型zh_core_web_md
,正常的安装方式为
pip install -U pip setuptools wheel
pip install -U spacy
python -m spacy download zh_core_web_md
最后一步我们没有办法写到requirements中,一个骚操作是把模型文件上传,然后在代码中解压,亲测有效
def load_spacy_model(env='local'):
import zipfile
def unzip_model(zip_file, out_path='.'):
zFile = zipfile.ZipFile(zip_file, "r")
for fileM in zFile.namelist():
zFile.extract(fileM, out_path)
zFile.close()
zip_file = 'zh_core_web_md-3.0.0.zip'
out_path = '.'
unzip_model(zip_file, out_path)
NLP = spacy.load(out_path + '/zh_core_web_md-3.0.0')
但其实并不需要这么麻烦,更好的做法是直接加载模型
from pyspark.sql import SparkSession
import os
import sys
import zipfile
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
def test_load_model():
sys.path.insert(0, 'zh_core_web_md-3.0.0.zip') # 通过py-files引入,作为一个路径
files = os.listdir('./')
print('x'*60, sys.stderr)
print('\n'.join(files), sys.stderr)
import spacy
nlp = spacy.load('zh_core_web_md-3.0.0.zip/zh_core_web_md-3.0.0')
text = '风寒三友即岁寒三友,指松、竹、梅三种植物。'
doc = nlp(text)
for token in doc:
# info = [token.text, token.lemma_, token.pos_, token.tag_, token.dep_,
# token.shape_, token.is_alpha, token.is_stop]
print(token.text, file=sys.stdout)
zh_core_web_md-3.0.0
的文件内容为:
accuracy.json
attribute_ruler
config.cfg
meta.json
ner
parser
senter
tagger
tok2vec
tokenizer
vocab
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!