## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromtypingimportAny,Dict,Union,List,Tuple,Callable,Optionalimportmathimportnumpyasnpimportpandasaspdfrompysparkimportkeyword_onlyfrompyspark.ml.connect.baseimport_PredictorParamsfrompyspark.ml.param.sharedimportHasProbabilityColfrompyspark.sqlimportDataFramefrompyspark.ml.commonimportinherit_docfrompyspark.ml.torch.distributorimportTorchDistributorfrompyspark.ml.param.sharedimport(HasMaxIter,HasFitIntercept,HasTol,HasWeightCol,HasSeed,HasNumTrainWorkers,HasBatchSize,HasLearningRate,HasMomentum,)frompyspark.ml.connect.baseimportPredictor,PredictionModelfrompyspark.ml.connect.io_utilsimportParamsReadWrite,CoreModelReadWritefrompyspark.sqlimportfunctionsassfclass_LogisticRegressionParams(_PredictorParams,HasMaxIter,HasFitIntercept,HasTol,HasWeightCol,HasNumTrainWorkers,HasBatchSize,HasLearningRate,HasMomentum,HasProbabilityCol,HasSeed,):""" Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`. .. versionadded:: 3.0.0 """def__init__(self,*args:Any):super(_LogisticRegressionParams,self).__init__(*args)self._setDefault(maxIter=100,tol=1e-6,batchSize=32,learningRate=0.001,momentum=0.9,seed=0,)def_train_logistic_regression_model_worker_fn(num_samples_per_worker:int,num_features:int,batch_size:int,max_iter:int,num_classes:int,learning_rate:float,momentum:float,fit_intercept:bool,seed:int,)->Any:frompyspark.ml.torch.distributorimport_get_spark_partition_data_loaderimporttorchimporttorch.nnastorch_nnfromtorch.nn.parallelimportDistributedDataParallelasDDPimporttorch.distributedimporttorch.optimasoptim# TODO: add a setting seed param.torch.manual_seed(seed)# TODO: support training on GPU# TODO: support L1 / L2 regularizationtorch.distributed.init_process_group("gloo")linear_model=torch_nn.Linear(num_features,num_classes,bias=fit_intercept,dtype=torch.float32)ddp_model=DDP(linear_model)loss_fn=torch_nn.CrossEntropyLoss()optimizer=optim.SGD(ddp_model.parameters(),lr=learning_rate,momentum=momentum)data_loader=_get_spark_partition_data_loader(num_samples_per_worker,batch_size,num_workers=0,prefetch_factor=None,# type: ignore)foriinrange(max_iter):ddp_model.train()step_count=0loss_sum=0.0forx,targetindata_loader:optimizer.zero_grad()output=ddp_model(x.to(torch.float32))loss=loss_fn(output,target.to(torch.long))loss.backward()loss_sum+=loss.detach().numpy()optimizer.step()step_count+=1# TODO: early stopping# When each epoch ends, computes loss on validation dataset and compare# current epoch validation loss with last epoch validation loss, if# less than provided `tol`, stop training.iftorch.distributed.get_rank()==0:print(f"Progress: train epoch {i+1} completes, train loss = {loss_sum/step_count}")iftorch.distributed.get_rank()==0:returnddp_model.module.state_dict()returnNone
[docs]@inherit_docclassLogisticRegression(Predictor["LogisticRegressionModel"],_LogisticRegressionParams,ParamsReadWrite):""" Logistic regression estimator. .. versionadded:: 3.5.0 Examples -------- >>> from pyspark.ml.connect.classification import LogisticRegression, LogisticRegressionModel >>> lor = LogisticRegression(maxIter=20, learningRate=0.01) >>> dataset = spark.createDataFrame([ ... ([1.0, 2.0], 1), ... ([2.0, -1.0], 1), ... ([-3.0, -2.0], 0), ... ([-1.0, -2.0], 0), ... ], schema=['features', 'label']) >>> lor_model = lor.fit(dataset) >>> transformed_dataset = lor_model.transform(dataset) >>> transformed_dataset.show() +------------+-----+----------+--------------------+ | features|label|prediction| probability| +------------+-----+----------+--------------------+ | [1.0, 2.0]| 1| 1|[0.02423273026943...| | [2.0, -1.0]| 1| 1|[0.09334788471460...| |[-3.0, -2.0]| 0| 0|[0.99808156490325...| |[-1.0, -2.0]| 0| 0|[0.96210002899169...| +------------+-----+----------+--------------------+ >>> lor_model.saveToLocal("/tmp/lor_model") >>> LogisticRegressionModel.loadFromLocal("/tmp/lor_model") LogisticRegression_... """_input_kwargs:Dict[str,Any]@keyword_onlydef__init__(self,*,featuresCol:str="features",labelCol:str="label",predictionCol:str="prediction",probabilityCol:str="probability",maxIter:int=100,tol:float=1e-6,numTrainWorkers:int=1,batchSize:int=32,learningRate:float=0.001,momentum:float=0.9,seed:int=0,):""" __init__( self, *, featuresCol: str = "features", labelCol: str = "label", predictionCol: str = "prediction", probabilityCol: str = "probability", maxIter: int = 100, tol: float = 1e-6, numTrainWorkers: int = 1, batchSize: int = 32, learningRate: float = 0.001, momentum: float = 0.9, seed: int = 0, ) """super(LogisticRegression,self).__init__()kwargs=self._input_kwargsself._set(**kwargs)def_fit(self,dataset:Union[DataFrame,pd.DataFrame])->"LogisticRegressionModel":importtorchimporttorch.nnastorch_nnifisinstance(dataset,pd.DataFrame):# TODO: support pandas dataframe fittingraiseNotImplementedError("Fitting pandas dataframe is not supported yet.")num_train_workers=self.getNumTrainWorkers()batch_size=self.getBatchSize()# We don't need to persist the dataset because the shuffling result from the repartition# has been cached.dataset=dataset.select(self.getFeaturesCol(),self.getLabelCol()).repartition(num_train_workers)num_rows,num_features,classes=dataset.select(sf.count(sf.lit(1)),sf.first(sf.array_size(self.getFeaturesCol())),sf.collect_set(self.getLabelCol()),).head()# type: ignore[misc]num_classes=len(classes)ifnum_classes<2:raiseValueError("Training dataset distinct labels must >= 2.")ifany(cnotinrange(0,num_classes)forcinclasses):raiseValueError("Training labels must be integers in [0, numClasses).")num_batches_per_worker=math.ceil(num_rows/num_train_workers/batch_size)num_samples_per_worker=num_batches_per_worker*batch_size# TODO: support GPU.distributor=TorchDistributor(local_mode=False,use_gpu=False,num_processes=num_train_workers)model_state_dict=distributor._train_on_dataframe(_train_logistic_regression_model_worker_fn,dataset,num_samples_per_worker=num_samples_per_worker,num_features=num_features,batch_size=batch_size,max_iter=self.getMaxIter(),num_classes=num_classes,learning_rate=self.getLearningRate(),momentum=self.getMomentum(),fit_intercept=self.getFitIntercept(),seed=self.getSeed(),)dataset.unpersist()torch_model=torch_nn.Linear(num_features,num_classes,bias=self.getFitIntercept(),dtype=torch.float32)torch_model.load_state_dict(model_state_dict)lor_model=LogisticRegressionModel(torch_model,num_features=num_features,num_classes=num_classes)lor_model._resetUid(self.uid)returnself._copyValues(lor_model)
[docs]@inherit_docclassLogisticRegressionModel(PredictionModel,_LogisticRegressionParams,ParamsReadWrite,CoreModelReadWrite):""" Model fitted by LogisticRegression. .. versionadded:: 3.5.0 """def__init__(self,torch_model:Any=None,num_features:Optional[int]=None,num_classes:Optional[int]=None,):super().__init__()self.torch_model=torch_modelself.num_features=num_featuresself.num_classes=num_classes@propertydefnumFeatures(self)->int:returnself.num_features# type: ignore[return-value]@propertydefnumClasses(self)->int:returnself.num_classes# type: ignore[return-value]def_input_columns(self)->List[str]:return[self.getOrDefault(self.featuresCol)]def_output_columns(self)->List[Tuple[str,str]]:output_cols=[(self.getOrDefault(self.predictionCol),"bigint")]prob_col=self.getOrDefault(self.probabilityCol)ifprob_col:output_cols+=[(prob_col,"array<double>")]returnoutput_colsdef_get_transform_fn(self)->Callable[["pd.Series"],Any]:importtorchimporttorch.nnastorch_nnmodel_state_dict=self.torch_model.state_dict()num_features=self.num_featuresnum_classes=self.num_classesfit_intercept=self.getFitIntercept()deftransform_fn(input_series:Any)->Any:torch_model=torch_nn.Linear(num_features,# type: ignore[arg-type]num_classes,# type: ignore[arg-type]bias=fit_intercept,dtype=torch.float32,)# TODO: Use spark broadast for `model_state_dict`,# it can improve performance when model is large.torch_model.load_state_dict(model_state_dict)input_array=np.stack(input_series.values)withtorch.inference_mode():result=torch_model(torch.tensor(input_array,dtype=torch.float32))predictions=torch.argmax(result,dim=1).numpy()ifself.getProbabilityCol():probabilities=torch.softmax(result,dim=1).numpy()returnpd.DataFrame({self.getPredictionCol():list(predictions),self.getProbabilityCol():list(probabilities),},index=input_series.index.copy(),)else:returnpd.Series(data=list(predictions),index=input_series.index.copy())returntransform_fndef_get_core_model_filename(self)->str:returnself.__class__.__name__+".torch"def_save_core_model(self,path:str)->None:importtorchimporttorch.nnastorch_nnlor_torch_model=torch_nn.Sequential(self.torch_model,torch_nn.Softmax(dim=1),)torch.save(lor_torch_model,path)def_load_core_model(self,path:str)->None:importtorchlor_torch_model=torch.load(path)self.torch_model=lor_torch_model[0]def_get_extra_metadata(self)->Dict[str,Any]:return{"num_features":self.num_features,"num_classes":self.num_classes,}def_load_extra_metadata(self,extra_metadata:Dict[str,Any])->None:""" Load extra metadata attribute from extra metadata json object. """self.num_features=extra_metadata["num_features"]self.num_classes=extra_metadata["num_classes"]