import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils import check_random_state, check_array
from sklearn.metrics.pairwise import rbf_kernel
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
X, y = datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.3)
nClass = len(np.unique(y))
Z = np.zeros((len(X_train), nClass))
for i in range(len(y_train)):
Z[i,y_train[i]] += 1
K = rbf_kernel(X_test, X_train)
n_neighbors = 4
indices = np.argpartition(K, -n_neighbors, axis=1)[:,-n_neighbors:]
P = np.empty((len(X_test), nClass))
for i in range(len(X_test)):
P[i,:] = K[i, indices[i]] @ Z[indices[i], :]
normalizer = np.sum(P, axis=1)
P[normalizer > 0] /= normalizer[normalizer > 0, np.newaxis]
P[normalizer == 0, :] = [1 / nClass] * nClass
y_pred = np.argmax(P, axis=1)
acc = accuracy_score(y_test, y_pred)
print("ACC::",acc)
After the useless
tmp_train_ids = Self.abslabeled + Self.intlabeled num_abslabeled = Len (Self.abslabeled) # --------- Z is used to store training sample attribution ------------- Z = np.zeros (len (tmp_train_ids), seld.nclass)) for, idx in enumerate (tmp_train_ids): if i <num_abslabeled: Z [i, self.y [IDX]] += 1 else: for lab in self.xin [IDX] .Inter: Z [i, lab] = self.xin [idx] .prob [lab] # ------------- K is the similarity matrix, and the radial cupuctuctive function ------------------ TMP_TEST_IDS = SELF.INTLABELED + SELF.TMP_SELECTED K = rbf_kernel (self.x [tmp_test_ids], self.x [tmp_train_ids]))) n_neighbors = Self.nClass if n_neighbors <Len (tmp_train_ids): n_neighbors = len (tmp_train_ids) # Print ("n_neighbors ::", n_neighbors) # ------------ Near neighboring index ------------------ indices = np.argpartition (k, -n_neighbors, axis = 1) [:, -n_neighbors:] P = np.empty (len (tmp_test_ids), self.nclass)) # Print ("p :::", P.Shape) # Print ("k :::", k.shape) # Print ("k ==", k) # Print ("Indices ::", Indices.shape) # Print ("indices ==", indiceS) # ------------------------------------ for, idx in enumerate (TMP_TEST_IDS): P [i ,:] = k [i, indiceS [i]] @ z [Indices [i],:] # ----------------------------------------------- normalizer = np.sum (p, axis = 1) P [normalizer> 0] /= normalizer [normalizer> 0, np.newaxis] P [normalizer == 0,:] = [1 / seld.nclass] * seld.nclass # ---------- The probability value of updating the storage space --------------- for, idx in enumerate (TMP_TEST_IDS): self.xin [IDX] .prob = OrdereDDICT () for lab in self.xin [IDX] .Inter: self.xin [IDX] .prob [lab] = p [i,:] [Self.labels.index (lab)] Print ("Sample: {} Probability distribution: {}". Format (IDX, Self.xin [IDX] .prob)) # ----------- Maximum matchomy according to the post-test probability that there are marking samples --------------- for idx in tmp_test_ids: self.xin [IDX] .evalab = max (self.xin [idx] .prob, key = self.xin [idx] .prob.get)