An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls

An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls


scenarios = [
{
“name”: “Safe database read”,
“tool”: research_db,
“kwargs”: {
“table”: “customers”,
“operation”: “select”,
“type”: “select”,
“sensitivity”: “medium”
}
},
{
“name”: “Blocked destructive database action”,
“tool”: research_db,
“kwargs”: {
“table”: “customers”,
“operation”: “drop”,
“type”: “drop_table”,
“sensitivity”: “critical”
}
},
{
“name”: “External email requiring approval”,
“tool”: research_email,
“kwargs”: {
“to”: “[email protected]”,
“recipient_domain”: “example.com”,
“subject”: “Quarterly update”,
“body”: “Sharing a non-confidential quarterly update.”,
“type”: “send_email”,
“sensitivity”: “medium”
}
},
{
“name”: “External email denied due to approval rejection”,
“tool”: research_email,
“kwargs”: {
“to”: “[email protected]”,
“recipient_domain”: “example.com”,
“subject”: “Confidential strategy”,
“body”: “This contains confidential strategy.”,
“type”: “send_email”,
“sensitivity”: “critical”
}
},
{
“name”: “Safe sandbox shell command”,
“tool”: ops_shell,
“kwargs”: {
“command”: “echo Agent governance is active”,
“type”: “shell_exec”,
“sensitivity”: “low”
}
},
{
“name”: “Dangerous shell command blocked”,
“tool”: ops_shell,
“kwargs”: {
“command”: “rm -rf /content/something”,
“type”: “shell_exec”,
“sensitivity”: “critical”
}
},
{
“name”: “Low-trust agent blocked from sensitive data”,
“tool”: shadow_db,
“kwargs”: {
“table”: “executive_compensation”,
“operation”: “select”,
“type”: “select”,
“sensitivity”: “critical”
}
},
{
“name”: “Financial transfer requiring approval”,
“tool”: finance_transfer,
“kwargs”: {
“amount”: 2500,
“destination”: “vendor-123”,
“type”: “transfer_money”,
“sensitivity”: “high”
}
},
{
“name”: “Large financial transfer rejected”,
“tool”: finance_transfer,
“kwargs”: {
“amount”: 15000,
“destination”: “vendor-999”,
“type”: “transfer_money”,
“sensitivity”: “critical”
}
},
]
results = []
for scenario in scenarios:
try:
output = scenario[“tool”](**scenario[“kwargs”])
results.append({
“scenario”: scenario[“name”],
“status”: “executed”,
“output”: output
})
except Exception as e:
results.append({
“scenario”: scenario[“name”],
“status”: “blocked_or_pending”,
“error”: str(e)
})
audit_df = audit_log.to_dataframe()
display_cols = [
“timestamp”,
“agent_name”,
“tool_name”,
“decision”,
“matched_rule”,
“severity”,
“reason”,
“record_hash”
]
display(audit_df[display_cols])
test_cases = [
{
“name”: “drop_table must be denied”,
“identity”: research_agent,
“tool_name”: “query_database”,
“action”: {“type”: “drop_table”, “sensitivity”: “critical”, “autonomous”: True},
“expected”: “deny”
},
{
“name”: “safe select should be allowed”,
“identity”: research_agent,
“tool_name”: “query_database”,
“action”: {“type”: “select”, “sensitivity”: “low”, “autonomous”: True},
“expected”: “allow”
},
{
“name”: “external email should require approval”,
“identity”: research_agent,
“tool_name”: “send_email”,
“action”: {
“type”: “send_email”,
“recipient_domain”: “example.com”,
“sensitivity”: “medium”,
“autonomous”: True
},
“expected”: “require_approval”
},
{
“name”: “low trust sensitive access denied”,
“identity”: unknown_agent,
“tool_name”: “query_database”,
“action”: {“type”: “select”, “sensitivity”: “critical”, “autonomous”: True},
“expected”: “deny”
},
{
“name”: “shell command should enter sandbox”,
“identity”: ops_agent,
“tool_name”: “shell_exec”,
“action”: {
“type”: “shell_exec”,
“command”: “echo hello”,
“sensitivity”: “low”,
“autonomous”: True
},
“expected”: “sandbox”
},
]
test_results = []
for test in test_cases:
decision = engine.evaluate(
identity=test[“identity”],
tool_name=test[“tool_name”],
action=test[“action”]
)
passed = decision.decision == test[“expected”]
test_results.append({
“test”: test[“name”],
“expected”: test[“expected”],
“actual”: decision.decision,
“passed”: passed,
“matched_rule”: decision.matched_rule
})
test_df = pd.DataFrame(test_results)
display(test_df)
engine.activate_kill_switch()
try:
research_db(
table=”customers”,
operation=”select”,
type=”select”,
sensitivity=”low”
)
except Exception as e:
pass
engine.deactivate_kill_switch()
audit_df = audit_log.to_dataframe()
summary = (
audit_df
.groupby([“decision”, “severity”], dropna=False)
.size()
.reset_index(name=”count”)
.sort_values(“count”, ascending=False)
)
display(summary)
agent_summary = (
audit_df
.groupby([“agent_name”, “decision”])
.size()
.reset_index(name=”count”)
.sort_values([“agent_name”, “count”], ascending=[True, False])
)
display(agent_summary)
decision_counts = audit_df[“decision”].value_counts()
plt.figure(figsize=(8, 5))
decision_counts.plot(kind=”bar”)
plt.title(“Governance Decisions Across Agent Actions”)
plt.xlabel(“Decision”)
plt.ylabel(“Count”)
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()
severity_counts = audit_df[“severity”].fillna(“none”).value_counts()
plt.figure(figsize=(8, 5))
severity_counts.plot(kind=”bar”)
plt.title(“Governance Events by Severity”)
plt.xlabel(“Severity”)
plt.ylabel(“Count”)
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()
G = nx.DiGraph()
for _, row in audit_df.iterrows():
agent_node = f”Agent: {row[‘agent_name’]}”
tool_node = f”Tool: {row[‘tool_name’]}”
decision_node = f”Decision: {row[‘decision’]}”
rule_node = f”Rule: {row[‘matched_rule’]}” if pd.notna(row[“matched_rule”]) else “Rule: default”
G.add_node(agent_node, node_type=”agent”)
G.add_node(tool_node, node_type=”tool”)
G.add_node(decision_node, node_type=”decision”)
G.add_node(rule_node, node_type=”rule”)
G.add_edge(agent_node, tool_node, relation=”calls”)
G.add_edge(tool_node, decision_node, relation=”produces”)
G.add_edge(decision_node, rule_node, relation=”matched”)
plt.figure(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, k=0.8)
nx.draw_networkx_nodes(G, pos, node_size=1800)
nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle=”->”, arrowsize=15)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title(“Agent Governance Graph: Agents, Tools, Decisions, and Policy Rules”)
plt.axis(“off”)
plt.tight_layout()
plt.show()
EXPORT_DIR = “/content/agt_tutorial_outputs”
os.makedirs(EXPORT_DIR, exist_ok=True)
audit_json_path = os.path.join(EXPORT_DIR, “tamper_evident_audit_log.json”)
audit_csv_path = os.path.join(EXPORT_DIR, “governance_audit_log.csv”)
policy_copy_path = os.path.join(EXPORT_DIR, “advanced_agent_policy.yaml”)
test_results_path = os.path.join(EXPORT_DIR, “policy_test_results.csv”)
with open(audit_json_path, “w”) as f:
json.dump([asdict(r) for r in audit_log.records], f, indent=2, default=str)
audit_df.to_csv(audit_csv_path, index=False)
test_df.to_csv(test_results_path, index=False)
shutil.copy(POLICY_PATH, policy_copy_path)



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *

Pin It on Pinterest