vPC Consistency Check Script

The latest version of the script can be downloaded from https://github.com/tecdct2941

The following code snippet captures the various vPC configurations on which it is desirable to run consistency checks:

  type_tbl = {  
        'Interface type' : '1',  
    'LACP Mode'              : '1',  
    'STP Port Guard' : '1',  
            'STP Port Type' : '1',  
     'Speed'              : '1',  
        'Duplex'             : '1',  
        'MTU'              : '1',  
          'Port Mode'       : '1',  
   'STP MST Simulate PVST' : '1',  
     'Native Vlan'       : '1',  
         'Pvlan list'       : '2',  
          'Admin port mode' : '1',  
   'lag-id' : '1',  
    'mode' : '1',  
      'vPC card type'     : '1',  
         'Allowed VLANs'      : '-',  
        'Local error VLANs' : '-'  
  }   

The following section uses NX-API CLI to gather the vPC operational state from a switch:

  def check_vpc_status(switch_x):  
  
            switch_ip = switch_x["mgmt_ip"]  
          switchuser = switch_x["username"]  
        switchpassword = switch_x["user_pw"]  
     return_data = {}  
   url = "https://"+switch_ip+"/ins"  
  
        myheaders={'content-type':'application/json-rpc'}  
          payload=[  
            {  
            "jsonrpc": "2.0",  
              "method": "cli",  
       "params": {  
                "cmd": "show vpc brief",  
               "version": 1.2  
         },  
         "id": 1  
        }  
        ]  
  
   response = requests.post(url,data=json.dumps(payload), headers=myheaders,auth=(switchuser,switchpassword)).json()  
  
   resp_body = response['result']['body']  
     return_data['vpc-peer-status'] = resp_body['vpc-peer-status']  
      resp_vpc_table = resp_body['TABLE_vpc']['ROW_vpc']  
  
          for iter in resp_vpc_table:  
  
                 one_vpc_id = iter['vpc-id']  
                vpc_id.append(one_vpc_id)  
  
           return_data[str(one_vpc_id)] = {}  
                  return_data[str(one_vpc_id)]['consistency-status'] = iter['vpc-consistency-status']  
                return_data[str(one_vpc_id)]['port-id'] = iter['vpc-ifindex']  
  
               return return_data  

The following section gathers vPC consistency state across the two switches:

  def check_vpc_consistency(switch_x,switch_y, vpc_id):  
  
      cmd = "show vpc consistency-parameters vpc " + str(vpc_id) + " errors"  
         switch_ip = switch_x["mgmt_ip"]  
          switchuser = switch_x["username"]  
        switchpassword = switch_x["user_pw"]  
     return_data = {}  
   url = "https://"+switch_ip+"/ins"  
  
        myheaders={'content-type':'application/json-rpc'}  
  
   payload=[  
            {  
            "jsonrpc": "2.0",  
              "method": "cli",  
       "params": {  
                "cmd": cmd,  
              "version": 1.2  
         },  
         "id": 1  
        }  
        ]  
  
   response = requests.post(url,data=json.dumps(payload), headers=myheaders,auth=(switchuser,switchpassword)).json()  
  
   resp_body = response['result']['body']['TABLE_vpc_consistency']['ROW_vpc_consistency']          fail_list = []  
  
      for iter in resp_body:  
             i_name = iter['vpc-param-name']  
            i_type = iter['vpc-param-type']  
            i_local = iter['vpc-param-local-val']  
              i_peer = iter['vpc-param-peer-val']  
  
                 if i_local != i_peer:  
                      print "Found Inconsistency in " + i_name + ": local val: " + i_local + " peer val : " + i_peer  
                       fail_list.append(iter)  
  
      return fail_list  

The following section corrects any inconsistencies identified:

  def correct_vpc_consistency(switch_x, switch_y,fail_list, vpc_id,port_id):  
  
        print "Correcting VPC " + str(vpc_id) + "\n"  
   switch_ip = ""  
   switchuser = ""  
          switchpassword = ""  
  
       for iter in fail_list:  
             if iter['vpc-param-name'] == "MTU":  
                      print "Correcting MTU"  
                   higher_mtu = ""  
                          if iter['vpc-param-local-val'] < iter['vpc-param-peer-val']:  
                            higher_mtu = iter["vpc-param-peer-val"]  
                                  switch_ip = switch_x["mgmt_ip"]  
                                  switchuser = switch_x["username"]  
                                switchpassword = switch_x["user_pw"]  
                     else:  
                              higher_mtu = iter["vpc-param-local-val"]  
                                 switch_ip = switch_y["mgmt_ip"]  
                                  switchuser = switch_y["username"]  
                                switchpassword = switch_y["user_pw"]  
  
              payload=[  
                    {  
                    "jsonrpc": "2.0",  
                      "method": "cli",  
               "params": {  
                        "cmd": "conf t",  
                       "version": 1.2  
                 },  
                 "id": 1  
                },  
                 {  
                    "jsonrpc": "2.0",  
                      "method": "cli",  
               "params": {  
                        "cmd": "interface " + port_id,  
                 "version": 1.2  

                    },  
                 "id": 2  
                },  
                 {  
                    "jsonrpc": "2.0",  
                      "method": "cli",  
               "params": {  
                        "cmd": "mtu " + str(higher_mtu),  
                       "version": 1.2  
                 },  
                 "id": 3  
                }  
                ]  
  
           url = "https://"+switch_ip+"/ins"  
               myheaders={'content-type':'application/json-rpc'}  
                  response = requests.post(url,data=json.dumps(payload), headers=myheaders,auth=(switchuser,switchpassword)).json()  

Finally, the control logic for the entire script:

  def main():  
  
        print "**** Calling vlan consistency checker ***"  
        consistent1 = check_vpc_status(switch_a)  
  
    print "Checking VPC status. Results:\n "  
         is_consistent = 1  
  
   for one_vpc_id in vpc_id:  
                  if consistent1[str(one_vpc_id)]['consistency-status'] == "INVALID":  
                      is_consistent = 0  
                          print "VPC " + str(one_vpc_id) + " is inconsistent - checking reason...\n"  
                     cons_check = check_vpc_consistency(switch_a, switch_b, one_vpc_id)  
                         correct_it = correct_vpc_consistency(switch_a, switch_b,cons_check, one_vpc_id,consistent1[str(one_vpc_id)]['port-id'] )  
                   print "VPC" + str(one_vpc_id) + " corrected"  
  
    time.sleep(5)  
  
       if is_consistent == 0:  
             print "Rechecking VPC Status after correction. Results: \n"  
              consistent_recheck = check_vpc_status(switch_a)  
  
             for one_vpc_id in vpc_id:  
                          if consistent_recheck[str(one_vpc_id)]['consistency-status'] == "INVALID":  
                               print "VPC " + str(one_vpc_id) + " is still inconsistent"  
      else:  
              print "No Inconsistency found !! \n"